下面列出了怎么用 io.netty.handler.codec.http.HttpObject 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
if (httpObject instanceof HttpResponse) {
httpResponse = (HttpResponse) httpObject;
captureContentEncoding(httpResponse);
}
if (httpObject instanceof HttpContent) {
HttpContent httpContent = (HttpContent) httpObject;
storeResponseContent(httpContent);
if (httpContent instanceof LastHttpContent) {
LastHttpContent lastContent = (LastHttpContent) httpContent;
captureTrailingHeaders(lastContent);
captureFullResponseContents();
}
}
return super.serverToProxyResponse(httpObject);
}
private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext ctx) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpHeaders.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
currentObj.getDecoderResult().isSuccess() ? OK : BAD_REQUEST, Unpooled.copiedBuffer(
apiResult.toString(), CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "application/json; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// -
// http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
// Write the response.
ctx.write(response);
return keepAlive;
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
// only filter when the original HttpRequest comes through. the RequestFilterAdapter is not designed to filter
// any subsequent HttpContents.
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
HttpMessageContents contents;
if (httpObject instanceof FullHttpMessage) {
FullHttpMessage httpContent = (FullHttpMessage) httpObject;
contents = new HttpMessageContents(httpContent);
} else {
// the HTTP object is not a FullHttpMessage, which means that message contents are not available on this request and cannot be modified.
contents = null;
}
HttpMessageInfo messageInfo = new HttpMessageInfo(originalRequest, ctx, isHttps(), getFullUrl(httpRequest), getOriginalUrl());
HttpResponse response = requestFilter.filterRequest(httpRequest, contents, messageInfo);
if (response != null) {
return response;
}
}
return null;
}
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
HttpObject processedHttpObject = httpObject;
for (HttpFilters filter : filters) {
try {
processedHttpObject = filter.serverToProxyResponse(processedHttpObject);
if (processedHttpObject == null) {
return null;
}
} catch (RuntimeException e) {
log.warn("Filter in filter chain threw exception. Filter method may have been aborted.", e);
}
}
return processedHttpObject;
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
String url = getFullUrl(httpRequest);
for (BlacklistEntry entry : blacklistedUrls) {
if (HttpMethod.CONNECT.equals(httpRequest.getMethod()) && entry.getHttpMethodPattern() == null) {
// do not allow CONNECTs to be blacklisted unless a method pattern is explicitly specified
continue;
}
if (entry.matches(url, httpRequest.getMethod().name())) {
HttpResponseStatus status = HttpResponseStatus.valueOf(entry.getStatusCode());
HttpResponse resp = new DefaultFullHttpResponse(httpRequest.getProtocolVersion(), status);
HttpHeaders.setContentLength(resp, 0L);
return resp;
}
}
}
return null;
}
@Override
public HttpObject proxyToClientResponse(HttpObject httpObject) {
HttpObject processedHttpObject = httpObject;
for (HttpFilters filter : filters) {
try {
processedHttpObject = filter.proxyToClientResponse(processedHttpObject);
if (processedHttpObject == null) {
return null;
}
} catch (RuntimeException e) {
log.warn("Filter in filter chain threw exception. Filter method may have been aborted.", e);
}
}
return processedHttpObject;
}
public static Bootstrap createNettyHttpClientBootstrap() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
p.addLast("clientResponseHandler", new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
throw new RuntimeException("Client response handler was not setup before the call");
}
});
}
});
return bootstrap;
}
@Before
public void setup() throws Exception {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
response = prepareResponse(ctx, msg, response);
}
});
}
});
channel = b.connect(HOST, PORT)
.sync()
.channel();
}
private void writeResponse(ChannelHandlerContext ctx, LastHttpContent trailer, StringBuilder responseData) {
boolean keepAlive = HttpUtil.isKeepAlive(request);
FullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, ((HttpObject) trailer).decoderResult()
.isSuccess() ? OK : BAD_REQUEST, Unpooled.copiedBuffer(responseData.toString(), CharsetUtil.UTF_8));
httpResponse.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
httpResponse.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content()
.readableBytes());
httpResponse.headers()
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(httpResponse);
if (!keepAlive) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public HttpObject proxyToClientResponse(HttpObject httpObject) {
if (httpObject instanceof HttpResponse) {
if (latencyMs > 0) {
try {
TimeUnit.MILLISECONDS.sleep(latencyMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while adding latency to response", e);
}
}
}
return super.proxyToClientResponse(httpObject);
}
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
if (httpObject instanceof HttpResponse) {
httpResponse = (HttpResponse) httpObject;
captureContentEncoding(httpResponse);
}
if (httpObject instanceof HttpContent) {
HttpContent httpContent = (HttpContent) httpObject;
storeResponseContent(httpContent);
if (httpContent instanceof LastHttpContent) {
LastHttpContent lastContent = (LastHttpContent) httpContent;
captureTrailingHeaders(lastContent);
captureFullResponseContents();
}
}
return super.serverToProxyResponse(httpObject);
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
if (ProxyUtils.isCONNECT(httpRequest)) {
Attribute<String> hostname = ctx.attr(AttributeKey.<String>valueOf(HttpsAwareFiltersAdapter.HOST_ATTRIBUTE_NAME));
String hostAndPort = httpRequest.getUri();
// CONNECT requests contain the port, even when using the default port. a sensible default is to remove the
// default port, since in most cases it is not explicitly specified and its presence (in a HAR file, for example)
// would be unexpected.
String hostNoDefaultPort = BrowserMobHttpUtil.removeMatchingPort(hostAndPort, 443);
hostname.set(hostNoDefaultPort);
}
}
return null;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
try {
if (msg instanceof HttpRequest) {
initFileChannel();
} else if (msg instanceof HttpContent) {
if (fileChnl == null) {
initFileChannel();
}
ByteBuf byteBuf = ((HttpContent) msg).content();
writeBytesToFile(byteBuf);
} else if (msg instanceof LastHttpContent) {
if (fileChnl != null && outStream != null) {
fileChnl.close();
outStream.close();
}
ctx.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
Throwable getDecoderFailure(HttpObject httpObject) {
if (httpObject == null) {
return null;
}
DecoderResult decoderResult = httpObject.decoderResult();
if (decoderResult == null) {
return null;
}
if (!decoderResult.isFailure()) {
return null;
}
return decoderResult.cause();
}
@Override
public HttpObject proxyToClientResponse(HttpObject httpObject) {
HttpObject processedHttpObject = httpObject;
for (HttpFilters filter : filters) {
try {
processedHttpObject = filter.proxyToClientResponse(processedHttpObject);
if (processedHttpObject == null) {
return null;
}
} catch (RuntimeException e) {
log.warn("Filter in filter chain threw exception. Filter method may have been aborted.", e);
}
}
return processedHttpObject;
}
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
if (httpObject instanceof HttpResponse) {
httpResponse = (HttpResponse) httpObject;
captureContentEncoding(httpResponse);
}
if (httpObject instanceof HttpContent) {
HttpContent httpContent = (HttpContent) httpObject;
storeResponseContent(httpContent);
if (httpContent instanceof LastHttpContent) {
LastHttpContent lastContent = (LastHttpContent) httpContent;
captureTrailingHeaders(lastContent);
captureFullResponseContents();
}
}
return super.serverToProxyResponse(httpObject);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject)
throws Exception {
// initial request
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("%s: Reading from client %s", System.currentTimeMillis(), httpObject));
}
if (httpObject instanceof HttpRequest) {
HttpRequest initialRequest = (HttpRequest) httpObject;
if (_channelHandlerDelegate == null) {
_channelHandlerDelegate =
HandlerDelegateFactory.create(initialRequest, _channelMediator, _connectionFlowRegistry);
_channelHandlerDelegate.onCreate();
}
}
_channelHandlerDelegate.onRead(httpObject);
}
/**
* This method takes care of closing client to proxy and/or proxy to server
* connections after finishing a write.
*/
private void closeConnectionsAfterWriteIfNecessary(
ProxyToServerConnection serverConnection,
HttpRequest currentHttpRequest, HttpResponse currentHttpResponse,
HttpObject httpObject) {
boolean closeServerConnection = shouldCloseServerConnection(
currentHttpRequest, currentHttpResponse, httpObject);
boolean closeClientConnection = shouldCloseClientConnection(
currentHttpRequest, currentHttpResponse, httpObject);
if (closeServerConnection) {
LOG.debug("Closing remote connection after writing to client");
serverConnection.disconnect();
}
if (closeClientConnection) {
LOG.debug("Closing connection to client after writes");
disconnect();
}
}
@Override
public void handleReadFromClient(ChannelMediator channelMediator, HttpObject httpObject) {
if (channelMediator == null) {
throw new IllegalStateException("HRFC: ChannelMediator can't be null");
}
try {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
_clientRequestBuilder.interpretHttpRequest(httpRequest);
_clientRequestBuilder.addHeaders(httpRequest);
}
if (httpObject instanceof HttpContent) {
_clientRequestBuilder.appendHttpContent((HttpContent) httpObject);
}
} catch (IOException e) {
throw new RuntimeException("HRFC: Failed to record HttpContent", e);
}
channelMediator.writeToServer(httpObject);
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
String url = getFullUrl(httpRequest);
for (BlacklistEntry entry : blacklistedUrls) {
if (HttpMethod.CONNECT.equals(httpRequest.getMethod()) && entry.getHttpMethodPattern() == null) {
// do not allow CONNECTs to be blacklisted unless a method pattern is explicitly specified
continue;
}
if (entry.matches(url, httpRequest.getMethod().name())) {
HttpResponseStatus status = HttpResponseStatus.valueOf(entry.getStatusCode());
HttpResponse resp = new DefaultFullHttpResponse(httpRequest.getProtocolVersion(), status);
HttpHeaders.setContentLength(resp, 0L);
return resp;
}
}
}
return null;
}
@Test
public void doChannelRead_checks_for_fully_send_responses_but_does_nothing_else_if_msg_is_not_HttpRequest_or_HttpContent() {
// given
HttpObject msgMock = mock(HttpObject.class);
// when
PipelineContinuationBehavior result = handler.doChannelRead(ctxMock, msgMock);
// then
verify(ctxMock, times(2)).channel();
verifyNoMoreInteractions(ctxMock);
verify(stateMock).isResponseSendingLastChunkSent();
verifyNoMoreInteractions(stateMock);
verifyNoMoreInteractions(msgMock);
assertThat(result).isEqualTo(PipelineContinuationBehavior.CONTINUE);
}
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
HttpObject processedHttpObject = httpObject;
for (HttpFilters filter : filters) {
try {
processedHttpObject = filter.serverToProxyResponse(processedHttpObject);
if (processedHttpObject == null) {
return null;
}
} catch (RuntimeException e) {
log.warn("Filter in filter chain threw exception. Filter method may have been aborted.", e);
}
}
return processedHttpObject;
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
if (ProxyUtils.isCONNECT(httpRequest)) {
Attribute<String> hostname = ctx.attr(AttributeKey.<String>valueOf(HttpsAwareFiltersAdapter.HOST_ATTRIBUTE_NAME));
String hostAndPort = httpRequest.getUri();
// CONNECT requests contain the port, even when using the default port. a sensible default is to remove the
// default port, since in most cases it is not explicitly specified and its presence (in a HAR file, for example)
// would be unexpected.
String hostNoDefaultPort = BrowserMobHttpUtil.removeMatchingPort(hostAndPort, 443);
hostname.set(hostNoDefaultPort);
}
}
return null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpObject) {
Response response = wrapResponse(ctx, (HttpObject) msg);
ctx.fireChannelRead(response);
getProxyRequestQueue(ctx).onResponseDrainNext(ctx, response);
} else {
ctx.fireChannelRead(msg);
}
}
static StringBuilder evaluateDecoderResult(HttpObject o) {
StringBuilder responseData = new StringBuilder();
DecoderResult result = o.decoderResult();
if (!result.isSuccess()) {
responseData.append("..Decoder Failure: ");
responseData.append(result.cause());
responseData.append("\r\n");
}
return responseData;
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
// store the CONNECT start time in case of failure, so we can populate the HarEntry with it
requestStartTime = new Date();
}
return null;
}
@Override
public HttpObject proxyToClientResponse(HttpObject httpObject) {
if (httpObject instanceof LastHttpContent) {
activityMonitor.requestFinished();
}
return super.proxyToClientResponse(httpObject);
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
additionalHeaders.forEach((key, value) -> httpRequest.headers().add(key, value));
}
return null;
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (proxyServer.isStopped()) {
log.warn("Aborting request to {} because proxy is stopped", originalRequest.getUri());
HttpResponse abortedResponse = new DefaultFullHttpResponse(originalRequest.getProtocolVersion(), HttpResponseStatus.SERVICE_UNAVAILABLE);
HttpHeaders.setContentLength(abortedResponse, 0L);
return abortedResponse;
}
for (HttpFilters filter : filters) {
try {
HttpResponse filterResponse = filter.clientToProxyRequest(httpObject);
if (filterResponse != null) {
// if we are short-circuiting the response to an HttpRequest, update ModifiedRequestAwareFilter instances
// with this (possibly) modified HttpRequest before returning the short-circuit response
if (httpObject instanceof HttpRequest) {
updateFiltersWithModifiedResponse((HttpRequest) httpObject);
}
return filterResponse;
}
} catch (RuntimeException e) {
log.warn("Filter in filter chain threw exception. Filter method may have been aborted.", e);
}
}
// if this httpObject is the HTTP request, set the modified request object on all ModifiedRequestAwareFilter
// instances, so they have access to all modifications the request filters made while filtering
if (httpObject instanceof HttpRequest) {
updateFiltersWithModifiedResponse((HttpRequest) httpObject);
}
return null;
}
@Override
public HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
for (Map.Entry<String, String> header : additionalHeaders.entrySet()) {
httpRequest.headers().add(header.getKey(), header.getValue());
}
}
return null;
}