下面列出了怎么用io.netty.channel.DefaultFileRegion的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
/**
* Write a {@link DefaultFileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
final long regionCount = region.count();
if (region.transferred() >= regionCount) {
in.remove();
return 0;
}
final long offset = region.transferred();
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
if (flushedAmount > 0) {
in.progress(flushedAmount);
if (region.transferred() >= regionCount) {
in.remove();
}
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
}
/**
* Write a {@link DefaultFileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
final long regionCount = region.count();
if (region.transferred() >= regionCount) {
in.remove();
return 0;
}
final long offset = region.transferred();
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
if (flushedAmount > 0) {
in.progress(flushedAmount);
if (region.transferred() >= regionCount) {
in.remove();
}
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
}
/**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
return writeFileRegion(in, (FileRegion) msg);
} else if (msg instanceof SpliceOutTask) {
if (!((SpliceOutTask) msg).spliceOut()) {
return WRITE_STATUS_SNDBUF_FULL;
}
in.remove();
return 1;
} else {
// Should never reach here.
throw new Error();
}
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
private static Object toContent(Object content) {
if (content instanceof File) {
File file = (File) content;
return new DefaultFileRegion(file, 0, file.length());
}
if (content instanceof InputStream) {
return new ChunkedStream((InputStream) content);
}
if (content instanceof ReadableByteChannel) {
return new ChunkedNioStream((ReadableByteChannel) content);
}
if (content instanceof byte[]) {
return Unpooled.wrappedBuffer((byte[]) content);
}
throw new IllegalArgumentException(
"unknown content type : " + content.getClass().getName());
}
private static Object toContent(Object content) {
if (content instanceof File) {
File file = (File) content;
return new DefaultFileRegion(file, 0, file.length());
}
if (content instanceof InputStream) {
return new ChunkedStream((InputStream) content);
}
if (content instanceof ReadableByteChannel) {
return new ChunkedNioStream((ReadableByteChannel) content);
}
if (content instanceof byte[]) {
return Unpooled.wrappedBuffer((byte[]) content);
}
throw new IllegalArgumentException("unknown content type : " + content.getClass().getName());
}
/**
* Sends content from the given {@link Path} using
* {@link java.nio.channels.FileChannel#transferTo(long, long, WritableByteChannel)}
* support, if the system supports it, the path resolves to a local file
* system {@link File}, compression and SSL/TLS is not enabled, then transfer will
* use zero-byte copy to the peer., otherwise chunked read/write will be used.
* <p>It will listens for any error on write and closes
* on terminal signal (complete|error). If more than one publisher is attached
* (multiple calls to send()) completion occurs after all publishers complete.</p>
* <p></p>Note: Nesting any send* method is not supported.</p>
*
* @param file the file Path
* @param position where to start
* @param count how much to transfer
*
* @return A Publisher to signal successful sequence write (e.g. after "flush") or any
* error during write
*/
default NettyOutbound sendFile(Path file, long position, long count) {
Objects.requireNonNull(file, "filepath");
return sendUsing(() -> FileChannel.open(file, StandardOpenOption.READ),
(c, fc) -> {
if (ReactorNetty.mustChunkFileTransfer(c, file)) {
ReactorNetty.addChunkedWriter(c);
try {
return new ChunkedNioFile(fc, position, count, 1024);
}
catch (Exception ioe) {
throw Exceptions.propagate(ioe);
}
}
return new DefaultFileRegion(fc, position, count);
},
ReactorNetty.fileCloser);
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else {
// Should never reach here.
throw new Error();
}
return true;
}
/**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
return writeFileRegion(in, (FileRegion) msg);
} else {
// Should never reach here.
throw new Error();
}
}
long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field via JNI.
src.open();
long res = sendFile(intValue(), src, baseOffset, offset, length);
if (res >= 0) {
return res;
}
return ioResult("sendfile", (int) res, SENDFILE_CONNECTION_RESET_EXCEPTION, SENDFILE_CLOSED_CHANNEL_EXCEPTION);
}
long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field via JNI.
src.open();
long res = sendFile(intValue(), src, baseOffset, offset, length);
if (res >= 0) {
return res;
}
return ioResult("sendfile", (int) res, SENDFILE_CONNECTION_RESET_EXCEPTION, SENDFILE_CLOSED_CHANNEL_EXCEPTION);
}
/**
* 输出文件响应
*
* @param responseEntity
* @return
* @throws IOException
*/
private ChannelFuture writeFileResponse(ResponseEntity<?> responseEntity) throws IOException {
RandomAccessFile raf = (RandomAccessFile) responseEntity.getBody();
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
if(responseEntity.getMimetype() != null && !responseEntity.getMimetype().trim().equals("")) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, responseEntity.getMimetype());
}
if(responseEntity.getFileName() != null && !responseEntity.getFileName().trim().equals("")) {
String fileName = new String(responseEntity.getFileName().getBytes("gb2312"), "ISO8859-1");
response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=" + fileName);
}
if (HttpUtil.isKeepAlive(HttpContextHolder.getRequest())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ChannelHandlerContext ctx = HttpContextHolder.getResponse().getChannelHandlerContext();
ctx.write(response);
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture = null;
if (ctx.pipeline().get(SslHandler.class) == null) {
sendFileFuture =
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
// Write the end marker.
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
ctx.newProgressivePromise());
// HttpChunkedInput will write the end marker (LastHttpContent) for us.
lastContentFuture = sendFileFuture;
}
return lastContentFuture;
}
/**
* Write a {@link DefaultFileRegion}
*
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes
*/
private boolean writeFileRegion(
ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
final long regionCount = region.count();
if (region.transfered() >= regionCount) {
in.remove();
return true;
}
final long baseOffset = region.position();
boolean done = false;
long flushedAmount = 0;
for (int i = writeSpinCount - 1; i >= 0; i--) {
final long offset = region.transfered();
final long localFlushedAmount =
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= regionCount) {
done = true;
break;
}
}
if (flushedAmount > 0) {
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
}
return done;
}
public static long sendfile(
int dest, DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field directly via JNI
src.open();
long res = sendfile0(dest, src, baseOffset, offset, length);
if (res >= 0) {
return res;
}
return ioResult("sendfile", (int) res, CONNECTION_RESET_EXCEPTION_SENDFILE);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if( wsURI.equalsIgnoreCase(request.getUri()) ) {
ctx.fireChannelRead(request.retain());
} else {
if( HttpHeaders.is100ContinueExpected(request) ) {
send100Continue(ctx);
}
try (
RandomAccessFile rFile = new RandomAccessFile(indexHTML, "r")
) {
HttpResponse response = new DefaultHttpResponse( request.getProtocolVersion(), HttpResponseStatus.OK );
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if( keepAlive ) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, rFile.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
if( ctx.pipeline().get(SslHandler.class) == null ) {
ctx.write(new DefaultFileRegion(rFile.getChannel(), 0, rFile.length()));
} else {
ctx.write(new ChunkedNioFile(rFile.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if( !keepAlive ) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
}
@Override
public void close() throws IOException {
try {
this.flush();
FileChannel file = new FileInputStream(this.file).getChannel();
long fileLength = file.size();
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpResponse.headers().set(HttpConst.CONTENT_LENGTH, fileLength);
httpResponse.headers().set(HttpConst.DATE, DateKit.gmtDate());
httpResponse.headers().set(HttpConst.SERVER, "blade/" + Const.VERSION);
boolean keepAlive = WebContext.request().keepAlive();
if (keepAlive) {
httpResponse.headers().set(HttpConst.CONNECTION, HttpConst.KEEP_ALIVE);
}
// Write the initial line and the header.
ctx.write(httpResponse);
ctx.write(new DefaultFileRegion(file, 0, fileLength), ctx.newProgressivePromise());
// Write the end marker.
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} finally {
if(null != outputStream){
outputStream.close();
}
}
}
void sendFile(File file) throws Exception {
long length = file.length();
LOG.debug("Got request of sending file {} of length {}.",
file, length);
Message handshake = MessageBuilder.buildFileHeader(length);
byte[] bytes = handshake.toByteArray();
// Sends HANDSHAKE first before transferring actual file data, the
// HANDSHAKE will tell the peer's channel to prepare for the file
// transferring.
channel.writeAndFlush(Unpooled.wrappedBuffer(bytes)).sync();
ChannelHandler prepender = channel.pipeline().get("frameEncoder");
// Removes length prepender, we don't need this handler for file
// transferring.
channel.pipeline().remove(prepender);
// Adds ChunkedWriteHandler for file transferring.
ChannelHandler cwh = new ChunkedWriteHandler();
channel.pipeline().addLast(cwh);
// Begins file transferring.
RandomAccessFile raf = new RandomAccessFile(file, "r");
if (channel.pipeline().get(SslHandler.class) != null) {
// Zero-Copy file transferring is not supported for ssl.
channel.writeAndFlush(new ChunkedFile(raf, 0, length, 8912));
} else {
// Use Zero-Copy file transferring in non-ssl mode.
FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, length);
channel.writeAndFlush(region);
}
// Restores pipeline to original state.
channel.pipeline().remove(cwh);
channel.pipeline().addLast("frameEncoder", prepender);
}
private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset,
long offset, long length) throws IOException;
private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset,
long offset, long length) throws IOException;
FileOperationEncoder(File file) {
long length = file.length();
this.content = new DefaultFileRegion(file, 0, length);
this.size = length;
}
FileOperationEncoder(File file) {
long length = file.length();
this.content = new DefaultFileRegion(file, 0, length);
this.size = length;
}
private static native long sendfile0(
int dest, DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException;