下面列出了怎么用 io.netty.handler.codec.http.HttpRequestEncoder 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if(sslCtx!=null)
{
p.addLast(new SslHandler(sslCtx.newEngine(ch.alloc())));
}
p.addLast(new HttpResponseDecoder());
//限制contentLength
p.addLast(new HttpObjectAggregator(65536));
p.addLast(new HttpRequestEncoder());
//大文件传输处理
// p.addLast(new ChunkedWriteHandler());
if(unPoolMsg)
{
p.addLast(new DefaultListenerHandler<HttpResponse>(new HttpListenerProxy(listener)));
}else
{
p.addLast(new DefaultListenerHandler<HttpResponse>(listener));
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private void releaseUploadChannel(Channel ch) {
if (ch.isOpen()) {
try {
ch.pipeline().remove(HttpResponseDecoder.class);
ch.pipeline().remove(HttpObjectAggregator.class);
ch.pipeline().remove(HttpRequestEncoder.class);
ch.pipeline().remove(ChunkedWriteHandler.class);
ch.pipeline().remove(HttpUploadHandler.class);
} catch (NoSuchElementException e) {
// If the channel is in the process of closing but not yet closed, some handlers could have
// been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
// removed in reverse-order, if we get a NoSuchElement exception, the following handlers
// should have been removed.
}
}
channelPool.release(ch);
}
@Override
protected void doStart(Listener listener) throws Throwable {
workerGroup = new NioEventLoopGroup(http_work, new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT));
b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpResponseDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast("encoder", new HttpRequestEncoder());
ch.pipeline().addLast("handler", new HttpClientHandler(NettyHttpClient.this));
}
});
timer = new HashedWheelTimer(new NamedThreadFactory(T_HTTP_TIMER), 1, TimeUnit.SECONDS, 64);
listener.onSuccess();
}
@SuppressWarnings("FutureReturnValueIgnored")
private void releaseUploadChannel(Channel ch) {
if (ch.isOpen()) {
try {
ch.pipeline().remove(IdleTimeoutHandler.class);
ch.pipeline().remove(HttpResponseDecoder.class);
ch.pipeline().remove(HttpObjectAggregator.class);
ch.pipeline().remove(HttpRequestEncoder.class);
ch.pipeline().remove(ChunkedWriteHandler.class);
ch.pipeline().remove(HttpUploadHandler.class);
} catch (NoSuchElementException e) {
// If the channel is in the process of closing but not yet closed, some handlers could have
// been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
// removed in reverse-order, if we get a NoSuchElement exception, the following handlers
// should have been removed.
}
}
channelPool.release(ch);
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SslHandler sslHandler = configureClientSSLOnDemand();
if (sslHandler != null) {
LOG.log(Level.FINE,
"Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}",
sslHandler);
pipeline.addLast("ssl", sslHandler);
}
pipeline.addLast("decoder", new HttpResponseDecoder());
// TODO need to configure the aggregator size
pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
if (readTimeout > 0) {
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
}
pipeline.addLast("client", new NettyHttpClientHandler());
}
/**
* Initialize our {@link ChannelPipeline} to connect the upstream server.
* LittleProxy acts as a client here.
*
* A {@link ChannelPipeline} invokes the read (Inbound) handlers in
* ascending ordering of the list and then the write (Outbound) handlers in
* descending ordering.
*
* Regarding the Javadoc of {@link HttpObjectAggregator} it's needed to have
* the {@link HttpResponseEncoder} or {@link HttpRequestEncoder} before the
* {@link HttpObjectAggregator} in the {@link ChannelPipeline}.
*/
private void initChannelPipeline(ChannelPipeline pipeline, HttpRequest httpRequest) {
if (trafficHandler != null) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
if ( proxyServer.isSendProxyProtocol()) {
pipeline.addLast("proxy-protocol-encoder", new HAProxyMessageEncoder());
}
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
proxyServer.getMaxInitialLineLength(),
proxyServer.getMaxHeaderSize(),
proxyServer.getMaxChunkSize()));
// Enable aggregation for filtering if necessary
int numberOfBytesToBuffer = proxyServer.getFiltersSource()
.getMaximumResponseBufferSizeInBytes();
if (numberOfBytesToBuffer > 0) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
pipeline.addLast("responseReadMonitor", responseReadMonitor);
pipeline.addLast("requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
"idle",
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
pipeline.addLast("handler", this);
}
@Setup(Level.Trial)
public void setup() {
byte[] bytes = new byte[256];
content = Unpooled.buffer(bytes.length);
content.writeBytes(bytes);
ByteBuf testContent = Unpooled.unreleasableBuffer(content.asReadOnly());
HttpHeaders headersWithChunked = new DefaultHttpHeaders(false);
headersWithChunked.add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
HttpHeaders headersWithContentLength = new DefaultHttpHeaders(false);
headersWithContentLength.add(HttpHeaderNames.CONTENT_LENGTH, testContent.readableBytes());
fullRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/index", testContent,
headersWithContentLength, EmptyHttpHeaders.INSTANCE);
contentLengthRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/index",
headersWithContentLength);
chunkedRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/index", headersWithChunked);
lastContent = new DefaultLastHttpContent(testContent, false);
encoder = new HttpRequestEncoder();
context = new EmbeddedChannelWriteReleaseHandlerContext(pooledAllocator ? PooledByteBufAllocator.DEFAULT :
UnpooledByteBufAllocator.DEFAULT, encoder) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
};
}
/**
* Begins the opening handshake
*
* @param channel
* Channel
* @param promise
* the {@link ChannelPromise} to be notified when the opening handshake is sent
*/
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
FullHttpRequest request = newHandshakeRequest();
HttpResponseDecoder decoder = channel.pipeline().get(HttpResponseDecoder.class);
if (decoder == null) {
HttpClientCodec codec = channel.pipeline().get(HttpClientCodec.class);
if (codec == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpResponseDecoder or HttpClientCodec"));
return promise;
}
}
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
}
if (ctx == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec"));
return;
}
p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
/**
* Initialize our {@link ChannelPipeline} to connect the upstream server.
* LittleProxy acts as a client here.
*
* A {@link ChannelPipeline} invokes the read (Inbound) handlers in
* ascending ordering of the list and then the write (Outbound) handlers in
* descending ordering.
*
* Regarding the Javadoc of {@link HttpObjectAggregator} it's needed to have
* the {@link HttpResponseEncoder} or {@link HttpRequestEncoder} before the
* {@link HttpObjectAggregator} in the {@link ChannelPipeline}.
*
* @param pipeline
* @param httpRequest
*/
private void initChannelPipeline(ChannelPipeline pipeline,
HttpRequest httpRequest) {
if (trafficHandler != null) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
proxyServer.getMaxInitialLineLength(),
proxyServer.getMaxHeaderSize(),
proxyServer.getMaxChunkSize()));
// Enable aggregation for filtering if necessary
int numberOfBytesToBuffer = proxyServer.getFiltersSource()
.getMaximumResponseBufferSizeInBytes();
if (numberOfBytesToBuffer > 0) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
pipeline.addLast("responseReadMonitor", responseReadMonitor);
pipeline.addLast("requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
"idle",
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
pipeline.addLast("handler", this);
}
public static void pipelineAddExit(final Object thiz, final Object arg2) {
final ChannelPipeline pipeline = (ChannelPipeline)thiz;
final ChannelHandler handler = (ChannelHandler)arg2;
try {
// Server
if (handler instanceof HttpServerCodec) {
pipeline.addLast(TracingHttpServerHandler.class.getName(), new TracingHttpServerHandler());
}
else if (handler instanceof HttpRequestDecoder) {
pipeline.addLast(TracingServerChannelInboundHandlerAdapter.class.getName(), new TracingServerChannelInboundHandlerAdapter());
}
else if (handler instanceof HttpResponseEncoder) {
pipeline.addLast(TracingServerChannelOutboundHandlerAdapter.class.getName(), new TracingServerChannelOutboundHandlerAdapter());
}
else
// Client
if (handler instanceof HttpClientCodec) {
pipeline.addLast(TracingHttpClientTracingHandler.class.getName(), new TracingHttpClientTracingHandler());
}
else if (handler instanceof HttpRequestEncoder) {
pipeline.addLast(TracingClientChannelOutboundHandlerAdapter.class.getName(), new TracingClientChannelOutboundHandlerAdapter());
}
else if (handler instanceof HttpResponseDecoder) {
pipeline.addLast(TracingClientChannelInboundHandlerAdapter.class.getName(), new TracingClientChannelInboundHandlerAdapter());
}
}
catch (final IllegalArgumentException ignore) {
}
}
/**
* Initialize our {@link ChannelPipeline} to connect the upstream server.
* LittleProxy acts as a client here.
*
* A {@link ChannelPipeline} invokes the read (Inbound) handlers in
* ascending ordering of the list and then the write (Outbound) handlers in
* descending ordering.
*
* Regarding the Javadoc of {@link HttpObjectAggregator} it's needed to have
* the {@link HttpResponseEncoder} or {@link HttpRequestEncoder} before the
* {@link HttpObjectAggregator} in the {@link ChannelPipeline}.
*
* @param pipeline
* @param httpRequest
*/
private void initChannelPipeline(ChannelPipeline pipeline,
HttpRequest httpRequest) {
if (trafficHandler != null) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
proxyServer.getMaxInitialLineLength(),
proxyServer.getMaxHeaderSize(),
proxyServer.getMaxChunkSize()));
// Enable aggregation for filtering if necessary
int numberOfBytesToBuffer = proxyServer.getFiltersSource()
.getMaximumResponseBufferSizeInBytes();
if (numberOfBytesToBuffer > 0) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
pipeline.addLast("responseReadMonitor", responseReadMonitor);
pipeline.addLast("requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
"idle",
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
pipeline.addLast("handler", this);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("http-decoder",new HttpResponseDecoder());
ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536));
ch.pipeline().addLast("json-decoder",new HttpJsonResponseDecoder(UserVo.class));
ch.pipeline().addLast("http-encoder",new HttpRequestEncoder());
ch.pipeline().addLast("json-encoder",new HttpJsonRequestEncoder());
ch.pipeline().addLast("handler",new ClientHandler());
}
@Override
public void channelRead0
(final ChannelHandlerContext ctx, final HttpRequest req) {
uri = req.getUri();
final Channel client = ctx.channel();
Bootstrap proxiedServer = new Bootstrap()
.group(client.eventLoop())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestEncoder(), new Forwarder(uri, client));
}
});
ChannelFuture f = proxiedServer.connect(host);
proxiedChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ctx.channel().pipeline().remove(HttpResponseEncoder.class);
HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1,
req.getMethod(), req.getUri());
newReq.headers().add(req.headers());
newReq.headers().set(CONNECTION, Values.CLOSE);
future.channel().writeAndFlush(newReq);
} else {
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
INTERNAL_SERVER_ERROR);
resp.headers().set(CONNECTION, Values.CLOSE);
LOG.info("Proxy " + uri + " failed. Cause: ", future.cause());
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
client.close();
}
}
});
}
@Override
public void channelRead0
(final ChannelHandlerContext ctx, final HttpRequest req) {
uri = req.getUri();
final Channel client = ctx.channel();
Bootstrap proxiedServer = new Bootstrap()
.group(client.eventLoop())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestEncoder(), new Forwarder(uri, client));
}
});
ChannelFuture f = proxiedServer.connect(host);
proxiedChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ctx.channel().pipeline().remove(HttpResponseEncoder.class);
HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1,
req.getMethod(), req.getUri());
newReq.headers().add(req.headers());
newReq.headers().set(CONNECTION, Values.CLOSE);
future.channel().writeAndFlush(newReq);
} else {
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
INTERNAL_SERVER_ERROR);
resp.headers().set(CONNECTION, Values.CLOSE);
LOG.info("Proxy " + uri + " failed. Cause: ", future.cause());
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
client.close();
}
}
});
}
protected static int determineHttpClientCodecOutboundState(HttpClientCodec currentCodec) {
try {
HttpRequestEncoder encoder = (HttpRequestEncoder) httpClientCodecOutboundHandlerField.get(currentCodec);
return httpObjectEncoderStateField.getInt(encoder);
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
public void connect(int port) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast("http-decoder",
new HttpResponseDecoder());
ch.pipeline().addLast("http-aggregator",
new HttpObjectAggregator(65536));
// XML解码器
ch.pipeline().addLast(
"xml-decoder",
new HttpXmlResponseDecoder(Order.class,
true));
ch.pipeline().addLast("http-encoder",
new HttpRequestEncoder());
ch.pipeline().addLast("xml-encoder",
new HttpXmlRequestEncoder());
ch.pipeline().addLast("xmlClientHandler",
new HttpXmlClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
private void initChannelPipeline(ChannelPipeline pipeline, ServerChannelHandler serverChannelHandler,
int idleTimeoutMsec) {
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("idle", new IdleStateHandler(0, 0, idleTimeoutMsec / 1000));
pipeline.addLast("handler", serverChannelHandler);
}
/**
* Begins the opening handshake
*
* @param channel
* Channel
* @param promise
* the {@link ChannelPromise} to be notified when the opening handshake is sent
*/
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
FullHttpRequest request = newHandshakeRequest();
HttpResponseDecoder decoder = channel.pipeline().get(HttpResponseDecoder.class);
if (decoder == null) {
HttpClientCodec codec = channel.pipeline().get(HttpClientCodec.class);
if (codec == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpResponseDecoder or HttpClientCodec"));
return promise;
}
}
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
}
if (ctx == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec"));
return;
}
p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
public static List<ByteBuf> encodeRequest(DefaultFullHttpRequest request) {
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast("http request encoder", new HttpRequestEncoder());
channel.writeOutbound(request);
return extractBuffers(channel);
}
public static ByteBuf encodeRequest(HttpRequest request) {
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast("http request encoder", new HttpRequestEncoder());
channel.writeOutbound(request);
channel.runPendingTasks();
return channel.readOutbound();
}
@Before
public void setUp() {
encoder = new ClientCodec();
channel = new EmbeddedChannel();
channel
.pipeline()
.addLast(new FrameLengthCodec())
.addLast(new Encoder())
// http encoder
.addLast(new HttpRequestEncoder())
.addLast(encoder);
}
@Override
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
/*
* If we use a HttpClientCodec here instead of the HttpRequestEncoder and the HttpResponseDecoder
* and there is a HttpProxyHandler in the pipeline, that ProxyHandler will add another HttpClientCodec
* for communication with the proxy. When the WebSocketClientHandshaker tries to exchange the codecs in
* the pipeline, it will mix up the two HttpRequestEncoders in the pipeline and exchange the wrong one.
* HttpReqestEncoder and HttpResponseDecoder has precedence over the HttpClientCodec, so the
* WebSocketClientHandshaker will remove these handlers inserted here and will leave the HttpClientCodec
* added by the HttpProxyHandler alone.
*/
pipeline.addLast(new HttpResponseDecoder());
pipeline.addLast(new HttpRequestEncoder());
pipeline.addLast(new HttpObjectAggregator(8192));
}
private void testHttpResponseAndFrameInSameBuffer(boolean codec) {
String url = "ws://localhost:9999/ws";
final WebSocketClientHandshaker shaker = newHandshaker(URI.create(url));
final WebSocketClientHandshaker handshaker = new WebSocketClientHandshaker(
shaker.uri(), shaker.version(), null, EmptyHttpHeaders.INSTANCE, Integer.MAX_VALUE) {
@Override
protected FullHttpRequest newHandshakeRequest() {
return shaker.newHandshakeRequest();
}
@Override
protected void verify(FullHttpResponse response) {
// Not do any verification, so we not need to care sending the correct headers etc in the test,
// which would just make things more complicated.
}
@Override
protected WebSocketFrameDecoder newWebsocketDecoder() {
return shaker.newWebsocketDecoder();
}
@Override
protected WebSocketFrameEncoder newWebSocketEncoder() {
return shaker.newWebSocketEncoder();
}
};
byte[] data = new byte[24];
PlatformDependent.threadLocalRandom().nextBytes(data);
// Create a EmbeddedChannel which we will use to encode a BinaryWebsocketFrame to bytes and so use these
// to test the actual handshaker.
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(url, null, false);
WebSocketServerHandshaker socketServerHandshaker = factory.newHandshaker(shaker.newHandshakeRequest());
EmbeddedChannel websocketChannel = new EmbeddedChannel(socketServerHandshaker.newWebSocketEncoder(),
socketServerHandshaker.newWebsocketDecoder());
assertTrue(websocketChannel.writeOutbound(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(data))));
byte[] bytes = "HTTP/1.1 101 Switching Protocols\r\nContent-Length: 0\r\n\r\n".getBytes(CharsetUtil.US_ASCII);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(bytes));
for (;;) {
ByteBuf frameBytes = websocketChannel.readOutbound();
if (frameBytes == null) {
break;
}
compositeByteBuf.addComponent(true, frameBytes);
}
EmbeddedChannel ch = new EmbeddedChannel(new HttpObjectAggregator(Integer.MAX_VALUE),
new SimpleChannelInboundHandler<FullHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
handshaker.finishHandshake(ctx.channel(), msg);
ctx.pipeline().remove(this);
}
});
if (codec) {
ch.pipeline().addFirst(new HttpClientCodec());
} else {
ch.pipeline().addFirst(new HttpRequestEncoder(), new HttpResponseDecoder());
}
// We need to first write the request as HttpClientCodec will fail if we receive a response before a request
// was written.
shaker.handshake(ch).syncUninterruptibly();
for (;;) {
// Just consume the bytes, we are not interested in these.
ByteBuf buf = ch.readOutbound();
if (buf == null) {
break;
}
buf.release();
}
assertTrue(ch.writeInbound(compositeByteBuf));
assertTrue(ch.finish());
BinaryWebSocketFrame frame = ch.readInbound();
ByteBuf expect = Unpooled.wrappedBuffer(data);
try {
assertEquals(expect, frame.content());
assertTrue(frame.isFinalFragment());
assertEquals(0, frame.rsv());
} finally {
expect.release();
frame.release();
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private Channel acquireUploadChannel() throws InterruptedException {
Promise<Channel> channelReady = eventLoop.next().newPromise();
channelPool
.acquire()
.addListener(
(Future<Channel> channelAcquired) -> {
if (!channelAcquired.isSuccess()) {
channelReady.setFailure(channelAcquired.cause());
return;
}
try {
Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
if (!isChannelPipelineEmpty(p)) {
channelReady.setFailure(
new IllegalStateException("Channel pipeline is not empty."));
return;
}
p.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen at random. We only expect HTTP servers to respond with
// an error message in the body and that should always be less than 10KiB.
p.addLast(new HttpObjectAggregator(10 * 1024));
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
synchronized (credentialsLock) {
p.addLast(new HttpUploadHandler(creds));
}
channelReady.setSuccess(ch);
} catch (Throwable t) {
channelReady.setFailure(t);
}
});
try {
return channelReady.get();
} catch (ExecutionException e) {
PlatformDependent.throwException(e.getCause());
return null;
}
}
/**
* Validates and finishes the opening handshake initiated by {@link #handshake}}.
*
* @param channel
* Channel
* @param response
* HTTP response containing the closing handshake details
*/
public final void finishHandshake(Channel channel, FullHttpResponse response) {
verify(response);
// Verify the subprotocol that we received from the server.
// This must be one of our expected subprotocols - or null/empty if we didn't want to speak a subprotocol
String receivedProtocol = response.headers().get(HttpHeaders.Names.SEC_WEBSOCKET_PROTOCOL);
receivedProtocol = receivedProtocol != null ? receivedProtocol.trim() : null;
String expectedProtocol = expectedSubprotocol != null ? expectedSubprotocol : "";
boolean protocolValid = false;
if (expectedProtocol.isEmpty() && receivedProtocol == null) {
// No subprotocol required and none received
protocolValid = true;
setActualSubprotocol(expectedSubprotocol); // null or "" - we echo what the user requested
} else if (!expectedProtocol.isEmpty() && receivedProtocol != null && !receivedProtocol.isEmpty()) {
// We require a subprotocol and received one -> verify it
for (String protocol : StringUtil.split(expectedSubprotocol, ',')) {
if (protocol.trim().equals(receivedProtocol)) {
protocolValid = true;
setActualSubprotocol(receivedProtocol);
break;
}
}
} // else mixed cases - which are all errors
if (!protocolValid) {
throw new WebSocketHandshakeException(String.format(
"Invalid subprotocol. Actual: %s. Expected one of: %s",
receivedProtocol, expectedSubprotocol));
}
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
// Remove decompressor from pipeline if its in use
HttpContentDecompressor decompressor = p.get(HttpContentDecompressor.class);
if (decompressor != null) {
p.remove(decompressor);
}
// Remove aggregator if present before
HttpObjectAggregator aggregator = p.get(HttpObjectAggregator.class);
if (aggregator != null) {
p.remove(aggregator);
}
ChannelHandlerContext ctx = p.context(HttpResponseDecoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
if (ctx == null) {
throw new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec");
}
p.replace(ctx.name(), "ws-decoder", newWebsocketDecoder());
} else {
if (p.get(HttpRequestEncoder.class) != null) {
p.remove(HttpRequestEncoder.class);
}
p.replace(ctx.name(),
"ws-decoder", newWebsocketDecoder());
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private Channel acquireUploadChannel() throws InterruptedException {
Promise<Channel> channelReady = eventLoop.next().newPromise();
channelPool
.acquire()
.addListener(
(Future<Channel> channelAcquired) -> {
if (!channelAcquired.isSuccess()) {
channelReady.setFailure(channelAcquired.cause());
return;
}
try {
Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
if (!isChannelPipelineEmpty(p)) {
channelReady.setFailure(
new IllegalStateException("Channel pipeline is not empty."));
return;
}
p.addFirst(
"timeout-handler",
new IdleTimeoutHandler(timeoutSeconds, WriteTimeoutException.INSTANCE));
p.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen arbitrarily. We only expect HTTP servers to respond
// with an error message in the body, and that should always be less than 10KiB. If
// the response is larger than 10KiB, HttpUploadHandler will catch the
// TooLongFrameException that HttpObjectAggregator throws and convert it to an
// IOException.
p.addLast(new HttpObjectAggregator(10 * 1024));
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
synchronized (credentialsLock) {
p.addLast(new HttpUploadHandler(creds, extraHttpHeaders));
}
if (!ch.eventLoop().inEventLoop()) {
// If addLast is called outside an event loop, then it doesn't complete until the
// event loop is run again. In that case, a message sent to the last handler gets
// delivered to the last non-pending handler, which will most likely end up
// throwing UnsupportedMessageTypeException. Therefore, we only complete the
// promise in the event loop.
ch.eventLoop().execute(() -> channelReady.setSuccess(ch));
} else {
channelReady.setSuccess(ch);
}
} catch (Throwable t) {
channelReady.setFailure(t);
}
});
try {
return channelReady.get();
} catch (ExecutionException e) {
PlatformDependent.throwException(e.getCause());
return null;
}
}