下面列出了怎么用 io.netty.handler.codec.http.HttpResponseDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。
private void handleResponse(@Nullable ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
EmbeddedChannel http = new EmbeddedChannel(new HttpResponseDecoder());
try {
http.writeInbound(Unpooled.unreleasableBuffer(packet.content()));
http.finish();
while (true) {
Object result = http.readInbound();
if (result == null) {
break;
}
if (result instanceof HttpResponse) {
HttpResponse res = (HttpResponse)result;
switch (res.getStatus().code()) {
case 200: handleUpnpMsearchResponse(packet, res); break;
default: log.debug("unknown upnp response: {}", res.getStatus().code()); break;
}
}
}
} finally {
http.finishAndReleaseAll();
}
}
/**
* 初始化 连接后端真正服务器
*/
private void initRealServerBoot() {
//初始化
realServerBootstrap = new Bootstrap();
realServerGroup = new NioEventLoopGroup();
realServerBootstrap.group(realServerGroup);
realServerBootstrap.channel(NioSocketChannel.class);
realServerBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TCPHandler());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast(new HttpSendHandler());
}
});
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if(sslCtx!=null)
{
p.addLast(new SslHandler(sslCtx.newEngine(ch.alloc())));
}
p.addLast(new HttpResponseDecoder());
//限制contentLength
p.addLast(new HttpObjectAggregator(65536));
p.addLast(new HttpRequestEncoder());
//大文件传输处理
// p.addLast(new ChunkedWriteHandler());
if(unPoolMsg)
{
p.addLast(new DefaultListenerHandler<HttpResponse>(new HttpListenerProxy(listener)));
}else
{
p.addLast(new DefaultListenerHandler<HttpResponse>(listener));
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private void releaseUploadChannel(Channel ch) {
if (ch.isOpen()) {
try {
ch.pipeline().remove(HttpResponseDecoder.class);
ch.pipeline().remove(HttpObjectAggregator.class);
ch.pipeline().remove(HttpRequestEncoder.class);
ch.pipeline().remove(ChunkedWriteHandler.class);
ch.pipeline().remove(HttpUploadHandler.class);
} catch (NoSuchElementException e) {
// If the channel is in the process of closing but not yet closed, some handlers could have
// been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
// removed in reverse-order, if we get a NoSuchElement exception, the following handlers
// should have been removed.
}
}
channelPool.release(ch);
}
@Override
protected void doStart(Listener listener) throws Throwable {
workerGroup = new NioEventLoopGroup(http_work, new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT));
b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpResponseDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast("encoder", new HttpRequestEncoder());
ch.pipeline().addLast("handler", new HttpClientHandler(NettyHttpClient.this));
}
});
timer = new HashedWheelTimer(new NamedThreadFactory(T_HTTP_TIMER), 1, TimeUnit.SECONDS, 64);
listener.onSuccess();
}
@SuppressWarnings("FutureReturnValueIgnored")
private void releaseUploadChannel(Channel ch) {
if (ch.isOpen()) {
try {
ch.pipeline().remove(IdleTimeoutHandler.class);
ch.pipeline().remove(HttpResponseDecoder.class);
ch.pipeline().remove(HttpObjectAggregator.class);
ch.pipeline().remove(HttpRequestEncoder.class);
ch.pipeline().remove(ChunkedWriteHandler.class);
ch.pipeline().remove(HttpUploadHandler.class);
} catch (NoSuchElementException e) {
// If the channel is in the process of closing but not yet closed, some handlers could have
// been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
// removed in reverse-order, if we get a NoSuchElement exception, the following handlers
// should have been removed.
}
}
channelPool.release(ch);
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SslHandler sslHandler = configureClientSSLOnDemand();
if (sslHandler != null) {
LOG.log(Level.FINE,
"Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}",
sslHandler);
pipeline.addLast("ssl", sslHandler);
}
pipeline.addLast("decoder", new HttpResponseDecoder());
// TODO need to configure the aggregator size
pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
if (readTimeout > 0) {
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
}
pipeline.addLast("client", new NettyHttpClientHandler());
}
/**
* Begins the opening handshake
*
* @param channel
* Channel
* @param promise
* the {@link ChannelPromise} to be notified when the opening handshake is sent
*/
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
FullHttpRequest request = newHandshakeRequest();
HttpResponseDecoder decoder = channel.pipeline().get(HttpResponseDecoder.class);
if (decoder == null) {
HttpClientCodec codec = channel.pipeline().get(HttpClientCodec.class);
if (codec == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpResponseDecoder or HttpClientCodec"));
return promise;
}
}
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
}
if (ctx == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec"));
return;
}
p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
private static void testPerformOpeningHandshake0(boolean subProtocol) {
EmbeddedChannel ch = new EmbeddedChannel(
new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
FullHttpRequest req = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat");
req.headers().set(HttpHeaderNames.HOST, "server.example.com");
req.headers().set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET);
req.headers().set(HttpHeaderNames.CONNECTION, "Upgrade");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_ORIGIN, "http://example.com");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, "8");
if (subProtocol) {
new WebSocketServerHandshaker08(
"ws://example.com/chat", "chat", false, Integer.MAX_VALUE, false).handshake(ch, req);
} else {
new WebSocketServerHandshaker08(
"ws://example.com/chat", null, false, Integer.MAX_VALUE, false).handshake(ch, req);
}
ByteBuf resBuf = ch.readOutbound();
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
ch2.writeInbound(resBuf);
HttpResponse res = ch2.readInbound();
Assert.assertEquals(
"s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT));
if (subProtocol) {
Assert.assertEquals("chat", res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL));
} else {
Assert.assertNull(res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL));
}
ReferenceCountUtil.release(res);
req.release();
}
private static void testPerformOpeningHandshake0(boolean subProtocol) {
EmbeddedChannel ch = new EmbeddedChannel(
new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
FullHttpRequest req = new DefaultFullHttpRequest(
HTTP_1_1, HttpMethod.GET, "/chat", Unpooled.copiedBuffer("^n:ds[4U", CharsetUtil.US_ASCII));
req.headers().set(HttpHeaderNames.HOST, "server.example.com");
req.headers().set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET);
req.headers().set(HttpHeaderNames.CONNECTION, "Upgrade");
req.headers().set(HttpHeaderNames.ORIGIN, "http://example.com");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_KEY1, "4 @1 46546xW%0l 1 5");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_KEY2, "12998 5 Y3 1 .P00");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
if (subProtocol) {
new WebSocketServerHandshaker00(
"ws://example.com/chat", "chat", Integer.MAX_VALUE).handshake(ch, req);
} else {
new WebSocketServerHandshaker00(
"ws://example.com/chat", null, Integer.MAX_VALUE).handshake(ch, req);
}
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
ch2.writeInbound(ch.readOutbound());
HttpResponse res = ch2.readInbound();
Assert.assertEquals("ws://example.com/chat", res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_LOCATION));
if (subProtocol) {
Assert.assertEquals("chat", res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL));
} else {
Assert.assertNull(res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL));
}
LastHttpContent content = ch2.readInbound();
Assert.assertEquals("8jKS'y:G*Co,Wxa-", content.content().toString(CharsetUtil.US_ASCII));
content.release();
req.release();
}
private static void testPerformOpeningHandshake0(boolean subProtocol) {
EmbeddedChannel ch = new EmbeddedChannel(
new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
FullHttpRequest req = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat");
req.headers().set(HttpHeaderNames.HOST, "server.example.com");
req.headers().set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET);
req.headers().set(HttpHeaderNames.CONNECTION, "Upgrade");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_ORIGIN, "http://example.com");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
req.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, "13");
if (subProtocol) {
new WebSocketServerHandshaker13(
"ws://example.com/chat", "chat", false, Integer.MAX_VALUE, false).handshake(ch, req);
} else {
new WebSocketServerHandshaker13(
"ws://example.com/chat", null, false, Integer.MAX_VALUE, false).handshake(ch, req);
}
ByteBuf resBuf = ch.readOutbound();
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
ch2.writeInbound(resBuf);
HttpResponse res = ch2.readInbound();
Assert.assertEquals(
"s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT));
if (subProtocol) {
Assert.assertEquals("chat", res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL));
} else {
Assert.assertNull(res.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL));
}
ReferenceCountUtil.release(res);
req.release();
}
public static void pipelineAddExit(final Object thiz, final Object arg2) {
final ChannelPipeline pipeline = (ChannelPipeline)thiz;
final ChannelHandler handler = (ChannelHandler)arg2;
try {
// Server
if (handler instanceof HttpServerCodec) {
pipeline.addLast(TracingHttpServerHandler.class.getName(), new TracingHttpServerHandler());
}
else if (handler instanceof HttpRequestDecoder) {
pipeline.addLast(TracingServerChannelInboundHandlerAdapter.class.getName(), new TracingServerChannelInboundHandlerAdapter());
}
else if (handler instanceof HttpResponseEncoder) {
pipeline.addLast(TracingServerChannelOutboundHandlerAdapter.class.getName(), new TracingServerChannelOutboundHandlerAdapter());
}
else
// Client
if (handler instanceof HttpClientCodec) {
pipeline.addLast(TracingHttpClientTracingHandler.class.getName(), new TracingHttpClientTracingHandler());
}
else if (handler instanceof HttpRequestEncoder) {
pipeline.addLast(TracingClientChannelOutboundHandlerAdapter.class.getName(), new TracingClientChannelOutboundHandlerAdapter());
}
else if (handler instanceof HttpResponseDecoder) {
pipeline.addLast(TracingClientChannelInboundHandlerAdapter.class.getName(), new TracingClientChannelInboundHandlerAdapter());
}
}
catch (final IllegalArgumentException ignore) {
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("http-decoder",new HttpResponseDecoder());
ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536));
ch.pipeline().addLast("json-decoder",new HttpJsonResponseDecoder(UserVo.class));
ch.pipeline().addLast("http-encoder",new HttpRequestEncoder());
ch.pipeline().addLast("json-encoder",new HttpJsonRequestEncoder());
ch.pipeline().addLast("handler",new ClientHandler());
}
public void connect(int port) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast("http-decoder",
new HttpResponseDecoder());
ch.pipeline().addLast("http-aggregator",
new HttpObjectAggregator(65536));
// XML解码器
ch.pipeline().addLast(
"xml-decoder",
new HttpXmlResponseDecoder(Order.class,
true));
ch.pipeline().addLast("http-encoder",
new HttpRequestEncoder());
ch.pipeline().addLast("xml-encoder",
new HttpXmlRequestEncoder());
ch.pipeline().addLast("xmlClientHandler",
new HttpXmlClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
private void initChannelPipeline(ChannelPipeline pipeline, ServerChannelHandler serverChannelHandler,
int idleTimeoutMsec) {
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("idle", new IdleStateHandler(0, 0, idleTimeoutMsec / 1000));
pipeline.addLast("handler", serverChannelHandler);
}
/**
* Begins the opening handshake
*
* @param channel
* Channel
* @param promise
* the {@link ChannelPromise} to be notified when the opening handshake is sent
*/
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
FullHttpRequest request = newHandshakeRequest();
HttpResponseDecoder decoder = channel.pipeline().get(HttpResponseDecoder.class);
if (decoder == null) {
HttpClientCodec codec = channel.pipeline().get(HttpClientCodec.class);
if (codec == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpResponseDecoder or HttpClientCodec"));
return promise;
}
}
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
}
if (ctx == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec"));
return;
}
p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
private static void testPerformOpeningHandshake0(boolean subProtocol) {
EmbeddedChannel ch = new EmbeddedChannel(
new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
FullHttpRequest req = ReferenceCountUtil.releaseLater(
new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"));
req.headers().set(Names.HOST, "server.example.com");
req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase());
req.headers().set(Names.CONNECTION, "Upgrade");
req.headers().set(Names.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
req.headers().set(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com");
req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
req.headers().set(Names.SEC_WEBSOCKET_VERSION, "8");
if (subProtocol) {
new WebSocketServerHandshaker08(
"ws://example.com/chat", "chat", false, Integer.MAX_VALUE).handshake(ch, req);
} else {
new WebSocketServerHandshaker08(
"ws://example.com/chat", null, false, Integer.MAX_VALUE).handshake(ch, req);
}
ByteBuf resBuf = (ByteBuf) ch.readOutbound();
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
ch2.writeInbound(resBuf);
HttpResponse res = (HttpResponse) ch2.readInbound();
Assert.assertEquals(
"s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(Names.SEC_WEBSOCKET_ACCEPT));
if (subProtocol) {
Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
} else {
Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
}
ReferenceCountUtil.release(res);
}
private static void testPerformOpeningHandshake0(boolean subProtocol) {
EmbeddedChannel ch = new EmbeddedChannel(
new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
FullHttpRequest req = ReferenceCountUtil.releaseLater(new DefaultFullHttpRequest(
HTTP_1_1, HttpMethod.GET, "/chat", Unpooled.copiedBuffer("^n:ds[4U", CharsetUtil.US_ASCII)));
req.headers().set(Names.HOST, "server.example.com");
req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase());
req.headers().set(Names.CONNECTION, "Upgrade");
req.headers().set(Names.ORIGIN, "http://example.com");
req.headers().set(Names.SEC_WEBSOCKET_KEY1, "4 @1 46546xW%0l 1 5");
req.headers().set(Names.SEC_WEBSOCKET_KEY2, "12998 5 Y3 1 .P00");
req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
if (subProtocol) {
new WebSocketServerHandshaker00(
"ws://example.com/chat", "chat", Integer.MAX_VALUE).handshake(ch, req);
} else {
new WebSocketServerHandshaker00(
"ws://example.com/chat", null, Integer.MAX_VALUE).handshake(ch, req);
}
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
ch2.writeInbound(ch.readOutbound());
HttpResponse res = (HttpResponse) ch2.readInbound();
Assert.assertEquals("ws://example.com/chat", res.headers().get(Names.SEC_WEBSOCKET_LOCATION));
if (subProtocol) {
Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
} else {
Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
}
LastHttpContent content = (LastHttpContent) ch2.readInbound();
Assert.assertEquals("8jKS'y:G*Co,Wxa-", content.content().toString(CharsetUtil.US_ASCII));
content.release();
}
private static void testPerformOpeningHandshake0(boolean subProtocol) {
EmbeddedChannel ch = new EmbeddedChannel(
new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
FullHttpRequest req = ReferenceCountUtil.releaseLater(
new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"));
req.headers().set(Names.HOST, "server.example.com");
req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase());
req.headers().set(Names.CONNECTION, "Upgrade");
req.headers().set(Names.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
req.headers().set(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com");
req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
req.headers().set(Names.SEC_WEBSOCKET_VERSION, "13");
if (subProtocol) {
new WebSocketServerHandshaker13(
"ws://example.com/chat", "chat", false, Integer.MAX_VALUE).handshake(ch, req);
} else {
new WebSocketServerHandshaker13(
"ws://example.com/chat", null, false, Integer.MAX_VALUE).handshake(ch, req);
}
ByteBuf resBuf = (ByteBuf) ch.readOutbound();
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
ch2.writeInbound(resBuf);
HttpResponse res = (HttpResponse) ch2.readInbound();
Assert.assertEquals(
"s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(Names.SEC_WEBSOCKET_ACCEPT));
if (subProtocol) {
Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
} else {
Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
}
ReferenceCountUtil.release(res);
}
public static HttpResponse decodeResponse(List<ByteBuf> payload) {
EmbeddedChannel channel = new EmbeddedChannel();
channel
.pipeline()
.addLast("http response decoder", new HttpResponseDecoder())
.addLast("http message aggregator", new HttpObjectAggregator(1048576));
for (ByteBuf buffer : payload) {
channel.writeInbound(buffer);
}
HttpResponse response = channel.readInbound();
return response;
}
@Override
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
/*
* If we use a HttpClientCodec here instead of the HttpRequestEncoder and the HttpResponseDecoder
* and there is a HttpProxyHandler in the pipeline, that ProxyHandler will add another HttpClientCodec
* for communication with the proxy. When the WebSocketClientHandshaker tries to exchange the codecs in
* the pipeline, it will mix up the two HttpRequestEncoders in the pipeline and exchange the wrong one.
* HttpReqestEncoder and HttpResponseDecoder has precedence over the HttpClientCodec, so the
* WebSocketClientHandshaker will remove these handlers inserted here and will leave the HttpClientCodec
* added by the HttpProxyHandler alone.
*/
pipeline.addLast(new HttpResponseDecoder());
pipeline.addLast(new HttpRequestEncoder());
pipeline.addLast(new HttpObjectAggregator(8192));
}
private void testHttpResponseAndFrameInSameBuffer(boolean codec) {
String url = "ws://localhost:9999/ws";
final WebSocketClientHandshaker shaker = newHandshaker(URI.create(url));
final WebSocketClientHandshaker handshaker = new WebSocketClientHandshaker(
shaker.uri(), shaker.version(), null, EmptyHttpHeaders.INSTANCE, Integer.MAX_VALUE) {
@Override
protected FullHttpRequest newHandshakeRequest() {
return shaker.newHandshakeRequest();
}
@Override
protected void verify(FullHttpResponse response) {
// Not do any verification, so we not need to care sending the correct headers etc in the test,
// which would just make things more complicated.
}
@Override
protected WebSocketFrameDecoder newWebsocketDecoder() {
return shaker.newWebsocketDecoder();
}
@Override
protected WebSocketFrameEncoder newWebSocketEncoder() {
return shaker.newWebSocketEncoder();
}
};
byte[] data = new byte[24];
PlatformDependent.threadLocalRandom().nextBytes(data);
// Create a EmbeddedChannel which we will use to encode a BinaryWebsocketFrame to bytes and so use these
// to test the actual handshaker.
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(url, null, false);
WebSocketServerHandshaker socketServerHandshaker = factory.newHandshaker(shaker.newHandshakeRequest());
EmbeddedChannel websocketChannel = new EmbeddedChannel(socketServerHandshaker.newWebSocketEncoder(),
socketServerHandshaker.newWebsocketDecoder());
assertTrue(websocketChannel.writeOutbound(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(data))));
byte[] bytes = "HTTP/1.1 101 Switching Protocols\r\nContent-Length: 0\r\n\r\n".getBytes(CharsetUtil.US_ASCII);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(bytes));
for (;;) {
ByteBuf frameBytes = websocketChannel.readOutbound();
if (frameBytes == null) {
break;
}
compositeByteBuf.addComponent(true, frameBytes);
}
EmbeddedChannel ch = new EmbeddedChannel(new HttpObjectAggregator(Integer.MAX_VALUE),
new SimpleChannelInboundHandler<FullHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
handshaker.finishHandshake(ctx.channel(), msg);
ctx.pipeline().remove(this);
}
});
if (codec) {
ch.pipeline().addFirst(new HttpClientCodec());
} else {
ch.pipeline().addFirst(new HttpRequestEncoder(), new HttpResponseDecoder());
}
// We need to first write the request as HttpClientCodec will fail if we receive a response before a request
// was written.
shaker.handshake(ch).syncUninterruptibly();
for (;;) {
// Just consume the bytes, we are not interested in these.
ByteBuf buf = ch.readOutbound();
if (buf == null) {
break;
}
buf.release();
}
assertTrue(ch.writeInbound(compositeByteBuf));
assertTrue(ch.finish());
BinaryWebSocketFrame frame = ch.readInbound();
ByteBuf expect = Unpooled.wrappedBuffer(data);
try {
assertEquals(expect, frame.content());
assertTrue(frame.isFinalFragment());
assertEquals(0, frame.rsv());
} finally {
expect.release();
frame.release();
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private Channel acquireUploadChannel() throws InterruptedException {
Promise<Channel> channelReady = eventLoop.next().newPromise();
channelPool
.acquire()
.addListener(
(Future<Channel> channelAcquired) -> {
if (!channelAcquired.isSuccess()) {
channelReady.setFailure(channelAcquired.cause());
return;
}
try {
Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
if (!isChannelPipelineEmpty(p)) {
channelReady.setFailure(
new IllegalStateException("Channel pipeline is not empty."));
return;
}
p.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen at random. We only expect HTTP servers to respond with
// an error message in the body and that should always be less than 10KiB.
p.addLast(new HttpObjectAggregator(10 * 1024));
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
synchronized (credentialsLock) {
p.addLast(new HttpUploadHandler(creds));
}
channelReady.setSuccess(ch);
} catch (Throwable t) {
channelReady.setFailure(t);
}
});
try {
return channelReady.get();
} catch (ExecutionException e) {
PlatformDependent.throwException(e.getCause());
return null;
}
}
/**
* Validates and finishes the opening handshake initiated by {@link #handshake}}.
*
* @param channel
* Channel
* @param response
* HTTP response containing the closing handshake details
*/
public final void finishHandshake(Channel channel, FullHttpResponse response) {
verify(response);
// Verify the subprotocol that we received from the server.
// This must be one of our expected subprotocols - or null/empty if we didn't want to speak a subprotocol
String receivedProtocol = response.headers().get(HttpHeaders.Names.SEC_WEBSOCKET_PROTOCOL);
receivedProtocol = receivedProtocol != null ? receivedProtocol.trim() : null;
String expectedProtocol = expectedSubprotocol != null ? expectedSubprotocol : "";
boolean protocolValid = false;
if (expectedProtocol.isEmpty() && receivedProtocol == null) {
// No subprotocol required and none received
protocolValid = true;
setActualSubprotocol(expectedSubprotocol); // null or "" - we echo what the user requested
} else if (!expectedProtocol.isEmpty() && receivedProtocol != null && !receivedProtocol.isEmpty()) {
// We require a subprotocol and received one -> verify it
for (String protocol : StringUtil.split(expectedSubprotocol, ',')) {
if (protocol.trim().equals(receivedProtocol)) {
protocolValid = true;
setActualSubprotocol(receivedProtocol);
break;
}
}
} // else mixed cases - which are all errors
if (!protocolValid) {
throw new WebSocketHandshakeException(String.format(
"Invalid subprotocol. Actual: %s. Expected one of: %s",
receivedProtocol, expectedSubprotocol));
}
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
// Remove decompressor from pipeline if its in use
HttpContentDecompressor decompressor = p.get(HttpContentDecompressor.class);
if (decompressor != null) {
p.remove(decompressor);
}
// Remove aggregator if present before
HttpObjectAggregator aggregator = p.get(HttpObjectAggregator.class);
if (aggregator != null) {
p.remove(aggregator);
}
ChannelHandlerContext ctx = p.context(HttpResponseDecoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
if (ctx == null) {
throw new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec");
}
p.replace(ctx.name(), "ws-decoder", newWebsocketDecoder());
} else {
if (p.get(HttpRequestEncoder.class) != null) {
p.remove(HttpRequestEncoder.class);
}
p.replace(ctx.name(),
"ws-decoder", newWebsocketDecoder());
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private Channel acquireUploadChannel() throws InterruptedException {
Promise<Channel> channelReady = eventLoop.next().newPromise();
channelPool
.acquire()
.addListener(
(Future<Channel> channelAcquired) -> {
if (!channelAcquired.isSuccess()) {
channelReady.setFailure(channelAcquired.cause());
return;
}
try {
Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
if (!isChannelPipelineEmpty(p)) {
channelReady.setFailure(
new IllegalStateException("Channel pipeline is not empty."));
return;
}
p.addFirst(
"timeout-handler",
new IdleTimeoutHandler(timeoutSeconds, WriteTimeoutException.INSTANCE));
p.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen arbitrarily. We only expect HTTP servers to respond
// with an error message in the body, and that should always be less than 10KiB. If
// the response is larger than 10KiB, HttpUploadHandler will catch the
// TooLongFrameException that HttpObjectAggregator throws and convert it to an
// IOException.
p.addLast(new HttpObjectAggregator(10 * 1024));
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
synchronized (credentialsLock) {
p.addLast(new HttpUploadHandler(creds, extraHttpHeaders));
}
if (!ch.eventLoop().inEventLoop()) {
// If addLast is called outside an event loop, then it doesn't complete until the
// event loop is run again. In that case, a message sent to the last handler gets
// delivered to the last non-pending handler, which will most likely end up
// throwing UnsupportedMessageTypeException. Therefore, we only complete the
// promise in the event loop.
ch.eventLoop().execute(() -> channelReady.setSuccess(ch));
} else {
channelReady.setSuccess(ch);
}
} catch (Throwable t) {
channelReady.setFailure(t);
}
});
try {
return channelReady.get();
} catch (ExecutionException e) {
PlatformDependent.throwException(e.getCause());
return null;
}
}