下面列出了怎么用 io.netty.handler.codec.http.websocketx.WebSocketVersion 的API类实例代码及写法,或者点击链接到github查看源代码。
private IpcdClientHandler createClientHandler(java.net.URI uri) {
final IpcdClientHandler handler =
new IpcdClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()), statusCallback);
handler.setDownloadHandler(new DownloadHandler(this));
handler.setFactoryResetHandler(new FactoryResetHandler(this));
handler.setLeaveHandler(new LeaveHandler(this));
handler.setRebootHandler(new RebootHandler(this));
handler.setGetDeviceInfoHandler(new GetDeviceInfoHandler(this));
handler.setGetEventConfigurationHandler(new GetEventConfigurationHandler(this));
handler.setGetParameterInfoHandler(new GetParameterInfoHandler(this));
handler.setGetParameterValuesHandler(new GetParameterValuesHandler(this));
handler.setGetReportConfigurationHandler(new GetReportConfigurationHandler(this));
handler.setSetDeviceInfoHandler(new SetDeviceInfoHandler(this));
handler.setSetEventConfigurationHandler(new SetEventConfigurationHandler(this));
handler.setSetParameterValuesHandler(new SetParameterValuesHandler(this));
handler.setSetReportConfigurationHandler(new SetReportConfigurationHandler(this));
return handler;
}
WebsocketClientOperations(URI currentURI,
WebsocketClientSpec websocketClientSpec,
HttpClientOperations replaced) {
super(replaced);
this.proxyPing = websocketClientSpec.handlePing();
Channel channel = channel();
onCloseState = MonoProcessor.create();
String subprotocols = websocketClientSpec.protocols();
handshaker = WebSocketClientHandshakerFactory.newHandshaker(currentURI,
WebSocketVersion.V13,
subprotocols != null && !subprotocols.isEmpty() ? subprotocols : null,
true,
replaced.requestHeaders()
.remove(HttpHeaderNames.HOST),
websocketClientSpec.maxFramePayloadLength());
handshaker.handshake(channel)
.addListener(f -> {
markPersistent(false);
channel.read();
});
}
public WebSocketClient(String host, int port, String path, boolean isSSL) throws Exception {
super(host, port, new Random());
String scheme = isSSL ? "wss://" : "ws://";
URI uri = new URI(scheme + host + ":" + port + path);
if (isSSL) {
sslCtx = SslContextBuilder.forClient().sslProvider(SslProvider.JDK).trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
this.handler = new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()));
}
@Override
public void configure(final ChannelPipeline pipeline) {
final String scheme = connection.getUri().getScheme();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme))
throw new IllegalStateException("Unsupported scheme (only ws: or wss: supported): " + scheme);
if (!supportsSsl() && "wss".equalsIgnoreCase(scheme))
throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
handler = new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
connection.getUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength));
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
pipeline.addLast("ws-handler", handler);
pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
}
/**
* websocket协议支持
*/
private void appendWebsocketCodec(ChannelPipeline pipeline) throws URISyntaxException {
// websocket 解码流程
URI uri = new URI(websocketUrl);
pipeline.addLast(new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13,
null, true, new DefaultHttpHeaders(), sessionConfig.maxFrameLength()));
pipeline.addLast(new BinaryWebSocketFrameToBytesDecoder());
// websocket 编码流程
// Web socket clients must set this to true to mask payload.
// Server implementations must set this to false.
pipeline.addLast(new WebSocket13FrameEncoder(true));
// 将ByteBuf转换为websocket二进制帧
pipeline.addLast(new BytesToBinaryWebSocketFrameEncoder());
}
public void open() throws Exception {
//System.out.println("WebSocket Client connecting");
webSocketClientHandler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 12800000), this);
ch = bootstrap.connect(uri.getHost(), port).sync().channel();
webSocketClientHandler.handshakeFuture().sync();
}
public void openAsync(Runnable onConnect) {
//System.out.println("WebSocket Client connecting");
webSocketClientHandler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 12800000), this);
ChannelFuture future = bootstrap.connect(uri.getHost(), port);
future.addListener((e) -> {
ch = future.channel();
webSocketClientHandler.handshakeFuture().addListener((e1) -> onConnect.run());
});
}
/**
* Creates an instance of {@link WebSocketClientHandler} with {@link AwsSigV4ClientHandshaker} as the handshaker
* for SigV4 auth.
* @return the instance of clientHandler.
*/
private WebSocketClientHandler createHandler() {
HandshakeRequestConfig handshakeRequestConfig =
HandshakeRequestConfig.parse(cluster.authProperties().get(AuthProperties.Property.JAAS_ENTRY));
WebSocketClientHandshaker handshaker = new LBAwareAwsSigV4ClientHandshaker(
connection.getUri(),
WebSocketVersion.V13,
null,
false,
EmptyHttpHeaders.INSTANCE,
cluster.getMaxContentLength(),
new ChainedSigV4PropertiesProvider(),
handshakeRequestConfig);
return new WebSocketClientHandler(handshaker);
}
WebSocketClientHandler(
URI uri, String userAgent, WebsocketConnection.WSClientEventHandler delegate) {
this.delegate = checkNotNull(delegate, "delegate must not be null");
checkArgument(!Strings.isNullOrEmpty(userAgent), "user agent must not be null or empty");
this.handshaker = WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true,
new DefaultHttpHeaders().add("User-Agent", userAgent));
}
/**
* 通道注册的时候配置websocket解码handler
*/
@Override
protected final void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(),host,port));
}
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, subprotocol, false, new DefaultHttpHeaders())));
pipeline.addLast(new WebSocketConnectedClientHandler());//连接成功监听handler
}
@Before
public void setup() throws Exception {
s = new Server(conf);
s.run();
Connector con = mac.getConnector("root", "secret");
con.securityOperations().changeUserAuthorizations("root", new Authorizations("A", "B", "C", "D", "E", "F"));
this.sessionId = UUID.randomUUID().toString();
AuthCache.getCache().put(sessionId, token);
group = new NioEventLoopGroup();
SslContext ssl = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
String cookieVal = ClientCookieEncoder.STRICT.encode(Constants.COOKIE_NAME, sessionId);
HttpHeaders headers = new DefaultHttpHeaders();
headers.add(HttpHeaderNames.COOKIE, cookieVal);
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(LOCATION,
WebSocketVersion.V13, (String) null, false, headers);
handler = new ClientHandler(handshaker);
Bootstrap boot = new Bootstrap();
boot.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", ssl.newHandler(ch.alloc(), "127.0.0.1", WS_PORT));
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(handler);
}
});
ch = boot.connect("127.0.0.1", WS_PORT).sync().channel();
// Wait until handshake is complete
while (!handshaker.isHandshakeComplete()) {
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
LOG.debug("Waiting for Handshake to complete");
}
}
public WebSocketFrameTransport(final BrokerAdmin brokerAdmin)
{
super(brokerAdmin, BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
URI uri = URI.create(String.format("tcp://%s:%d/",
getBrokerAddress().getHostString(),
getBrokerAddress().getPort()));
_webSocketClientHandler = new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()));
}
@Before
public void setup() throws Exception {
s = new Server(conf);
s.run();
Connector con = mac.getConnector("root", "secret");
con.securityOperations().changeUserAuthorizations("root", new Authorizations("A", "B", "C", "D", "E", "F"));
this.sessionId = UUID.randomUUID().toString();
AuthCache.put(sessionId, TimelyPrincipal.anonymousPrincipal());
group = new NioEventLoopGroup();
SslContext ssl = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
String cookieVal = ClientCookieEncoder.STRICT.encode(Constants.COOKIE_NAME, sessionId);
HttpHeaders headers = new DefaultHttpHeaders();
headers.add(HttpHeaderNames.COOKIE, cookieVal);
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(LOCATION,
WebSocketVersion.V13, (String) null, false, headers);
handler = new ClientHandler(handshaker);
Bootstrap boot = new Bootstrap();
boot.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", ssl.newHandler(ch.alloc(), "127.0.0.1", WS_PORT));
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(handler);
}
});
ch = boot.connect("127.0.0.1", WS_PORT).sync().channel();
// Wait until handshake is complete
while (!handshaker.isHandshakeComplete()) {
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
LOG.debug("Waiting for Handshake to complete");
}
}
static WebSocketVersion getWsVersion(String str) {
switch (str) {
case "0":
return WebSocketVersion.V00;
case "7":
return WebSocketVersion.V07;
case "8":
return WebSocketVersion.V08;
case "13":
return WebSocketVersion.V13;
}
return WebSocketVersion.UNKNOWN;
}
private void handleWebSocketResponse(ChannelHandlerContext ctx, Outgoing out) {
WebSocketHttpResponse response = (WebSocketHttpResponse) out.message;
WebSocketServerHandshaker handshaker = response.handshakerFactory().newHandshaker(lastRequest);
if (handshaker == null) {
HttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.UPGRADE_REQUIRED);
res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());
HttpUtil.setContentLength(res, 0);
super.unbufferedWrite(ctx, new Outgoing(res, out.promise));
response.subscribe(new CancelledSubscriber<>());
} else {
// First, insert new handlers in the chain after us for handling the websocket
ChannelPipeline pipeline = ctx.pipeline();
HandlerPublisher<WebSocketFrame> publisher = new HandlerPublisher<>(ctx.executor(), WebSocketFrame.class);
HandlerSubscriber<WebSocketFrame> subscriber = new HandlerSubscriber<>(ctx.executor());
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-subscriber", subscriber);
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-publisher", publisher);
// Now remove ourselves from the chain
ctx.pipeline().remove(ctx.name());
// Now do the handshake
// Wrap the request in an empty request because we don't need the WebSocket handshaker ignoring the body,
// we already have handled the body.
handshaker.handshake(ctx.channel(), new EmptyHttpRequest(lastRequest));
// And hook up the subscriber/publishers
response.subscribe(subscriber);
publisher.subscribe(response);
}
}
public void openConnection() throws InterruptedException{
Bootstrap bootstrap = new Bootstrap();
final WebSocketClientHandler handler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
mUri, WebSocketVersion.V08, null, false,
new DefaultHttpHeaders()));
bootstrap.group(mGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel socketChannel){
ChannelPipeline channelPipeline =
socketChannel.pipeline();
channelPipeline.addLast(mSslContext.newHandler(
socketChannel.alloc(),
mUri.getHost(),
PORT));
channelPipeline.addLast(new HttpClientCodec(),
new HttpObjectAggregator(8192),
handler);
}
});
mChannel = bootstrap.connect(mUri.getHost(), PORT).sync().channel();
handler.handshakeFuture().sync();
setConnected(Boolean.TRUE);
}
public AppWebSocketClient(String host, int port, String path) throws Exception {
super(host, port, new Random(), new ServerProperties(Collections.emptyMap()));
URI uri = new URI("wss://" + host + ":" + port + path);
this.sslCtx = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
this.appHandler = new AppWebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()));
}
public WebSocketClient(final URI uri) {
super("ws-client-%d");
final Bootstrap b = new Bootstrap().group(group);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
final String protocol = uri.getScheme();
if (!"ws".equals(protocol))
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
try {
final WebSocketClientHandler wsHandler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536));
final MessageSerializer serializer = new GraphBinaryMessageSerializerV1();
b.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) {
final ChannelPipeline p = ch.pipeline();
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(65536),
wsHandler,
new WebSocketGremlinRequestEncoder(true, serializer),
new WebSocketGremlinResponseDecoder(serializer),
callbackResponseHandler);
}
});
channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
wsHandler.handshakeFuture().get(10000, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public NettyWebSocketTransportHandler() {
DefaultHttpHeaders headers = new DefaultHttpHeaders();
getTransportOptions().getHttpHeaders().forEach((key, value) -> {
headers.set(key, value);
});
handshaker = WebSocketClientHandshakerFactory.newHandshaker(
getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL,
true, headers, getMaxFrameSize());
}
@Override
public void addToPipeline(final ChannelPipeline pipeline) {
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(8192));
final WebSocketClientHandshaker handShaker = new WhiteSpaceInPathWebSocketClientHandshaker13(serverUri,
WebSocketVersion.V13, PROTOCOL, false, createHttpHeaders(httpHeaders), Integer.MAX_VALUE);
pipeline.addLast("websocket-protocol-handler", new WebSocketClientProtocolHandler(handShaker));
pipeline.addLast("websocket-frame-codec", new ByteBufToWebSocketFrameCodec());
}
/**
* @return true if the handshake is done properly.
* @throws URISyntaxException throws if there is an error in the URI syntax.
* @throws InterruptedException throws if the connecting the server is interrupted.
*/
public boolean handhshake() throws InterruptedException, URISyntaxException, SSLException, ProtocolException {
boolean isSuccess;
URI uri = new URI(url);
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port;
if (uri.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = uri.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
logger.error("Only WS(S) is supported.");
return false;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
group = new NioEventLoopGroup();
HttpHeaders headers = new DefaultHttpHeaders();
for (Map.Entry<String, String> entry : customHeaders.entrySet()) {
headers.add(entry.getKey(), entry.getValue());
}
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
handler = new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, subProtocol, true, headers),
latch);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE, handler);
}
});
channel = bootstrap.connect(uri.getHost(), port).sync().channel();
isSuccess = handler.handshakeFuture().sync().isSuccess();
logger.info("WebSocket Handshake successful : " + isSuccess);
return isSuccess;
}
public WebSocketVersion getVersion() {
return version;
}
public WebsocketConnectionBuilder setVersion(WebSocketVersion version) {
this.version = version;
return this;
}
public <R> CompletableFuture<R> connect(Function<Channel, R> connectFunction) {
io.netty.bootstrap.Bootstrap b = new io.netty.bootstrap.Bootstrap();
int actualPort = uri.getPort() == -1 ? (uri.getScheme().equals("wss") ? 443 : 80) : uri.getPort();
final WebSocketClientHandler handler =
new WebSocketClientHandler(
new WebSocketClientHandshaker13(
uri, WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, 1280000) {
@Override
protected FullHttpRequest newHandshakeRequest() {
FullHttpRequest request = super.newHandshakeRequest();
if (clientNegotiation.getSupportedSubProtocols() != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < clientNegotiation.getSupportedSubProtocols().size(); ++i) {
if (i > 0) {
sb.append(", ");
}
sb.append(clientNegotiation.getSupportedSubProtocols().get(i));
}
request.headers().add(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, sb.toString());
}
clientNegotiation.beforeRequest(request.headers());
return request;
}
@Override
protected void verify(FullHttpResponse response) {
super.verify(response);
clientNegotiation.afterRequest(response.headers());
}
}, connectFunction);
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (ssl != null) {
SSLEngine sslEngine = ssl.createSSLEngine(uri.getHost(), actualPort);
sslEngine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(sslEngine));
}
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("ws-handler", handler);
}
});
//System.out.println("WebSocket Client connecting");
b.connect(uri.getHost(), actualPort).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.cause() != null) {
handler.handshakeFuture.completeExceptionally(future.cause());
}
}
});
return handler.handshakeFuture;
}
public WebsocketClientHandshaker(URI webSocketURL, HttpHeaders customHeaders, int maxFramePayloadLength) {
super(webSocketURL, WebSocketVersion.V13, null, false, customHeaders, maxFramePayloadLength);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipe = ch.pipeline();
if(_sslCfg != null){ //ssl
SslContext sslCtx = null;
if(_isServer){
sslCtx = SslContextBuilder.forServer(new File(_sslCfg.getCertPath()),
new File(_sslCfg.getPemPath())).build();
}else{
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
}
SslHandler sslHandler = sslCtx.newHandler(ch.alloc());
pipe.addLast(sslHandler);
}
//
if(_decodeType == DFActorDefine.TCP_DECODE_WEBSOCKET){
if(_isServer){
pipe.addLast(new HttpServerCodec());
pipe.addLast(new HttpObjectAggregator(_maxLen));
pipe.addLast(new DFWSRequestHandler("/"+_wsSfx));
pipe.addLast(new WebSocketServerProtocolHandler("/"+_wsSfx, null, true));
if(_customHandler == null){
pipe.addLast(new TcpWsHandler(_actorId, _requestId, _decodeType, (DFActorTcpDispatcher) _dispatcher, _decoder, _encoder));
}else{
pipe.addLast(_customHandler);
}
}else{
pipe.addLast(new HttpClientCodec());
pipe.addLast(new HttpObjectAggregator(_maxLen));
if(_customHandler == null){
DFWsClientHandler handler =
new DFWsClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
new URI(_wsSfx), WebSocketVersion.V13, null, false, new DefaultHttpHeaders()),
_actorId, _requestId, _decodeType, (DFActorTcpDispatcher) _dispatcher, _decoder, _encoder);
pipe.addLast(handler);
}else{
pipe.addLast(_customHandler);
}
}
}
else if(_decodeType == DFActorDefine.TCP_DECODE_HTTP){
if(_isServer){
// pipe.addLast(new HttpServerCodec());
pipe.addLast(new HttpRequestDecoder());
pipe.addLast(new HttpObjectAggregator(_maxLen));
pipe.addLast(new HttpResponseEncoder());
pipe.addLast(new ChunkedWriteHandler());
if(_customHandler == null){
pipe.addLast(new DFHttpSvrHandler(_actorId, _requestId, _decoder, (DFHttpDispatcher) _dispatcher, (CbHttpServer) _userHandler));
}else{
pipe.addLast(_customHandler);
}
}else{ //client
pipe.addLast(new HttpClientCodec());
pipe.addLast(new HttpObjectAggregator(_maxLen));
if(_customHandler == null){
pipe.addLast(new DFHttpCliHandler(_actorId, _requestId, _decoder, (DFHttpDispatcher) _dispatcher,
(CbHttpClient) _userHandler, (DFHttpCliReqWrap) _reqData));
}else{
pipe.addLast(_customHandler);
}
}
}
else{
if(_decodeType == DFActorDefine.TCP_DECODE_LENGTH){ //length base field
pipe.addLast(new LengthFieldBasedFrameDecoder(_maxLen, 0, 2, 0, 2));
}
if(_customHandler == null){
pipe.addLast(new TcpHandler(_actorId, _requestId, _decodeType, (DFActorTcpDispatcher) _dispatcher, _decoder, _encoder));
}else{
pipe.addLast(_customHandler);
}
}
}
@Override
public String[] getSupportedVersions() {
return new String[]{WebSocketVersion.V13.toHttpHeaderValue()};
}
public LBAwareAwsSigV4ClientHandshaker(URI webSocketURL, WebSocketVersion version, String subprotocol, boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength, ChainedSigV4PropertiesProvider sigV4PropertiesProvider, HandshakeRequestConfig handshakeRequestConfig) {
super(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength);
this.sigV4PropertiesProvider = sigV4PropertiesProvider;
this.handshakeRequestConfig = handshakeRequestConfig;
this.sigV4Properties = this.loadProperties();
}
public ChannelFuture connect(boolean reconnect) throws SSLException, URISyntaxException, InterruptedException {
QueryStringEncoder queryEncoder = new QueryStringEncoder(this.tunnelServerUrl);
queryEncoder.addParam("method", "agentRegister");
if (id != null) {
queryEncoder.addParam("id", id);
}
// ws://127.0.0.1:7777/ws?method=agentRegister
final URI agentRegisterURI = queryEncoder.toUri();
logger.info("Try to register arthas agent, uri: {}", agentRegisterURI);
String scheme = agentRegisterURI.getScheme() == null ? "ws" : agentRegisterURI.getScheme();
final String host = agentRegisterURI.getHost() == null ? "127.0.0.1" : agentRegisterURI.getHost();
final int port;
if (agentRegisterURI.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = agentRegisterURI.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
throw new IllegalArgumentException("Only WS(S) is supported. tunnelServerUrl: " + tunnelServerUrl);
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(agentRegisterURI,
WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler(newHandshaker);
final TunnelClientSocketClientHandler handler = new TunnelClientSocketClientHandler(TunnelClient.this);
Bootstrap bs = new Bootstrap();
bs.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler,
handler);
}
});
ChannelFuture connectFuture = bs.connect();
if (reconnect) {
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.cause() != null) {
logger.error("connect to tunnel server error, uri: {}", tunnelServerUrl, future.cause());
}
}
});
}
channel = connectFuture.sync().channel();
return handler.registerFuture();
}
public void start() throws URISyntaxException, SSLException, InterruptedException {
String scheme = tunnelServerURI.getScheme() == null ? "ws" : tunnelServerURI.getScheme();
final String host = tunnelServerURI.getHost() == null ? "127.0.0.1" : tunnelServerURI.getHost();
final int port;
if (tunnelServerURI.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = tunnelServerURI.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
logger.error("Only WS(S) is supported, uri: {}", tunnelServerURI);
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// connect to local server
WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(tunnelServerURI,
WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler(newHandshaker);
final ForwardClientSocketClientHandler forwardClientSocketClientHandler = new ForwardClientSocketClientHandler(
localServerURI);
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler,
forwardClientSocketClientHandler);
}
});
channel = b.connect(tunnelServerURI.getHost(), port).sync().channel();
logger.info("forward client connect to server success, uri: " + tunnelServerURI);
}