下面列出了怎么用 io.netty.handler.codec.http.HttpChunkedInput 的API类实例代码及写法,或者点击链接到github查看源代码。
private void uploadsShouldWork(boolean casUpload, EmbeddedChannel ch, HttpResponseStatus status)
throws Exception {
ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(new UploadCommand(CACHE_URI, casUpload, "abcdef", data, 5), writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.method()).isEqualTo(HttpMethod.PUT);
assertThat(request.headers().get(HttpHeaders.CONNECTION))
.isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString());
HttpChunkedInput content = ch.readOutbound();
assertThat(content.readChunk(ByteBufAllocator.DEFAULT).content().readableBytes()).isEqualTo(5);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
assertThat(writePromise.isDone()).isTrue();
assertThat(ch.isOpen()).isTrue();
}
/** Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND). */
@Test
public void httpErrorsAreSupported() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpUploadHandler(null, ImmutableList.of()));
ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(new UploadCommand(CACHE_URI, true, "abcdef", data, 5), writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request).isInstanceOf(HttpRequest.class);
HttpChunkedInput content = ch.readOutbound();
assertThat(content).isInstanceOf(HttpChunkedInput.class);
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN);
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.CLOSE);
ch.writeInbound(response);
assertThat(writePromise.isDone()).isTrue();
assertThat(writePromise.cause()).isInstanceOf(HttpException.class);
assertThat(((HttpException) writePromise.cause()).response().status())
.isEqualTo(HttpResponseStatus.FORBIDDEN);
assertThat(ch.isOpen()).isFalse();
}
/**
* Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND) with a
* Content-Length header.
*/
@Test
public void httpErrorsWithContentAreSupported() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpUploadHandler(null, ImmutableList.of()));
ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(new UploadCommand(CACHE_URI, true, "abcdef", data, 5), writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request).isInstanceOf(HttpRequest.class);
HttpChunkedInput content = ch.readOutbound();
assertThat(content).isInstanceOf(HttpChunkedInput.class);
ByteBuf errorMsg = ByteBufUtil.writeAscii(ch.alloc(), "error message");
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, errorMsg);
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
assertThat(writePromise.isDone()).isTrue();
assertThat(writePromise.cause()).isInstanceOf(HttpException.class);
assertThat(((HttpException) writePromise.cause()).response().status())
.isEqualTo(HttpResponseStatus.NOT_FOUND);
assertThat(ch.isOpen()).isTrue();
}
/**
* 输出文件响应
*
* @param responseEntity
* @return
* @throws IOException
*/
private ChannelFuture writeFileResponse(ResponseEntity<?> responseEntity) throws IOException {
RandomAccessFile raf = (RandomAccessFile) responseEntity.getBody();
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
if(responseEntity.getMimetype() != null && !responseEntity.getMimetype().trim().equals("")) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, responseEntity.getMimetype());
}
if(responseEntity.getFileName() != null && !responseEntity.getFileName().trim().equals("")) {
String fileName = new String(responseEntity.getFileName().getBytes("gb2312"), "ISO8859-1");
response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=" + fileName);
}
if (HttpUtil.isKeepAlive(HttpContextHolder.getRequest())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ChannelHandlerContext ctx = HttpContextHolder.getResponse().getChannelHandlerContext();
ctx.write(response);
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture = null;
if (ctx.pipeline().get(SslHandler.class) == null) {
sendFileFuture =
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
// Write the end marker.
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
ctx.newProgressivePromise());
// HttpChunkedInput will write the end marker (LastHttpContent) for us.
lastContentFuture = sendFileFuture;
}
return lastContentFuture;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
checkState(userPromise == null, "handler can't be shared between pipelines.");
userPromise = promise;
if (!(msg instanceof UploadCommand)) {
failAndResetUserPromise(
new IllegalArgumentException(
"Unsupported message type: " + StringUtil.simpleClassName(msg)));
return;
}
HttpRequest request = buildRequest((UploadCommand) msg);
addCredentialHeaders(request, ((UploadCommand) msg).uri());
HttpChunkedInput body = buildBody((UploadCommand) msg);
ctx.writeAndFlush(request)
.addListener(
(f) -> {
if (f.isSuccess()) {
return;
}
failAndClose(f.cause(), ctx);
});
ctx.writeAndFlush(body)
.addListener(
(f) -> {
if (f.isSuccess()) {
return;
}
failAndClose(f.cause(), ctx);
});
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
checkState(userPromise == null, "handler can't be shared between pipelines.");
userPromise = promise;
if (!(msg instanceof UploadCommand)) {
failAndResetUserPromise(
new IllegalArgumentException(
"Unsupported message type: " + StringUtil.simpleClassName(msg)));
return;
}
UploadCommand cmd = (UploadCommand) msg;
path = constructPath(cmd.uri(), cmd.hash(), cmd.casUpload());
contentLength = cmd.contentLength();
HttpRequest request = buildRequest(path, constructHost(cmd.uri()), contentLength);
addCredentialHeaders(request, cmd.uri());
addExtraRemoteHeaders(request);
addUserAgentHeader(request);
HttpChunkedInput body = buildBody(cmd);
ctx.writeAndFlush(request)
.addListener(
(f) -> {
if (f.isSuccess()) {
return;
}
failAndClose(f.cause(), ctx);
});
ctx.writeAndFlush(body)
.addListener(
(f) -> {
if (f.isSuccess()) {
return;
}
failAndClose(f.cause(), ctx);
});
}
private HttpChunkedInput buildBody(UploadCommand msg) {
return new HttpChunkedInput(new ChunkedStream(msg.data()));
}
private HttpChunkedInput buildBody(UploadCommand msg) {
return new HttpChunkedInput(new ChunkedStream(msg.data()));
}