下面列出了怎么用io.netty.channel.FileRegion的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
if (msg instanceof MemcacheMessage) {
if (expectingMoreContent) {
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));
}
@SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
final M m = (M) msg;
out.add(encodeMessage(ctx, m));
}
if (msg instanceof MemcacheContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
int contentLength = contentLength(msg);
if (contentLength > 0) {
out.add(encodeAndRetain(msg));
} else {
out.add(Unpooled.EMPTY_BUFFER);
}
expectingMoreContent = !(msg instanceof LastMemcacheContent);
}
}
@Override
protected void doWriteFileRegion(FileRegion region) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
if (outChannel == null) {
outChannel = Channels.newChannel(os);
}
long written = 0;
for (;;) {
long localWritten = region.transferTo(outChannel, written);
if (localWritten == -1) {
checkEOF(region);
return;
}
written += localWritten;
if (written >= region.count()) {
return;
}
}
}
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Write a {@link FileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link FileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
*/
private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
if (byteChannel == null) {
byteChannel = new KQueueSocketWritableByteChannel();
}
final long flushedAmount = region.transferTo(byteChannel, region.transferred());
if (flushedAmount > 0) {
in.progress(flushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
}
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
return writeFileRegion(in, (FileRegion) msg);
} else if (msg instanceof SpliceOutTask) {
if (!((SpliceOutTask) msg).spliceOut()) {
return WRITE_STATUS_SNDBUF_FULL;
}
in.remove();
return 1;
} else {
// Should never reach here.
throw new Error();
}
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
if (msg instanceof MemcacheMessage) {
if (expectingMoreContent) {
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));
}
@SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
final M m = (M) msg;
out.add(encodeMessage(ctx, m));
}
if (msg instanceof MemcacheContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
int contentLength = contentLength(msg);
if (contentLength > 0) {
out.add(encodeAndRetain(msg));
} else {
out.add(Unpooled.EMPTY_BUFFER);
}
expectingMoreContent = !(msg instanceof LastMemcacheContent);
}
}
public void checkProducerTransactionState(
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final SelectMappedBufferResult selectMappedBufferResult) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.markOnewayRPC();
try {
FileRegion fileRegion =
new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("invokeProducer failed,", future.cause());
}
}
});
} catch (Throwable e) {
log.error("invokeProducer exception", e);
selectMappedBufferResult.release();
}
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
public void checkProducerTransactionState(
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final SelectMappedBufferResult selectMappedBufferResult) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.markOnewayRPC();
try {
FileRegion fileRegion =
new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("invokeProducer failed,", future.cause());
}
}
});
} catch (Throwable e) {
log.error("invokeProducer exception", e);
selectMappedBufferResult.release();
}
}
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
@Override
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
if (msg instanceof FileRegion) {
return 0;
}
// jupiter object
if (msg instanceof PayloadHolder) {
return ((PayloadHolder) msg).size();
}
return unknownSize;
}
/**
* Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
* can be handled by this encoder.
*
* @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
* io.netty.handler.codec.MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
WritableByteChannel writableByteChannel = new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
out.writeBytes(src);
return out.capacity();
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void close() throws IOException {
}
};
long toTransfer = msg.count();
while (true) {
long transferred = msg.transfered();
if (toTransfer - transferred <= 0) {
break;
}
msg.transferTo(writableByteChannel, transferred);
}
}
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ViewMessageRequestHeader requestHeader =
(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
final SelectMappedBufferResult selectMappedBufferResult =
this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
if (selectMappedBufferResult != null) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
try {
FileRegion fileRegion =
new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("Transfer one message from page cache failed, ", future.cause());
}
}
});
} catch (Throwable e) {
log.error("", e);
selectMappedBufferResult.release();
}
return null;
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
}
return response;
}
/**
* Determine the content length of the given object.
*
* @param msg the object to determine the length of.
* @return the determined content length.
*/
private static int contentLength(Object msg) {
if (msg instanceof MemcacheContent) {
return ((MemcacheContent) msg).content().readableBytes();
}
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return (int) ((FileRegion) msg).count();
}
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));
}
/**
* Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
* can be handled by this encoder.将消息编码到ByteBuf中。对于可由此编码器处理的每个书面消息,将调用此方法。
*
* @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
* io.netty.handler.codec.MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
WritableByteChannel writableByteChannel = new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
out.writeBytes(src);
return out.capacity();
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void close() throws IOException {
}
};
long toTransfer = msg.count();
while (true) {
long transferred = msg.transfered();
if (toTransfer - transferred <= 0) {
break;
}
msg.transferTo(writableByteChannel, transferred);
}
}
/**
* Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
* can be handled by this encoder.
*
* @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
* io.netty.handler.codec.MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
WritableByteChannel writableByteChannel = new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
out.writeBytes(src);
return out.capacity();
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void close() throws IOException {
}
};
long toTransfer = msg.count();
while (true) {
long transferred = msg.transfered();
if (toTransfer - transferred <= 0) {
break;
}
msg.transferTo(writableByteChannel, transferred);
}
}
/**
* Encode the content, depending on the object type.
*
* @param msg the object to encode.
* @return the encoded object.
*/
private static Object encodeAndRetain(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).retain();
}
if (msg instanceof MemcacheContent) {
return ((MemcacheContent) msg).content().retain();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).retain();
}
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));
}
private static long contentLength(Object msg) {
if (msg instanceof HttpContent) {
return ((HttpContent) msg).content().readableBytes();
}
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
}
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));
}
@Test
public void testLargeFileRegionChunked() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new HttpResponseEncoder());
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
assertTrue(channel.writeOutbound(response));
ByteBuf buffer = channel.readOutbound();
assertEquals("HTTP/1.1 200 OK\r\n" + HttpHeaderNames.TRANSFER_ENCODING + ": " +
HttpHeaderValues.CHUNKED + "\r\n\r\n", buffer.toString(CharsetUtil.US_ASCII));
buffer.release();
assertTrue(channel.writeOutbound(FILE_REGION));
buffer = channel.readOutbound();
assertEquals("80000000\r\n", buffer.toString(CharsetUtil.US_ASCII));
buffer.release();
FileRegion region = channel.readOutbound();
assertSame(FILE_REGION, region);
region.release();
buffer = channel.readOutbound();
assertEquals("\r\n", buffer.toString(CharsetUtil.US_ASCII));
buffer.release();
assertTrue(channel.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT));
buffer = channel.readOutbound();
assertEquals("0\r\n\r\n", buffer.toString(CharsetUtil.US_ASCII));
buffer.release();
assertFalse(channel.finish());
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf || msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
// nothing left to write
break;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
while (readableBytes > 0) {
doWriteBytes(buf);
int newReadableBytes = buf.readableBytes();
in.progress(readableBytes - newReadableBytes);
readableBytes = newReadableBytes;
}
in.remove();
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
long transferred = region.transferred();
doWriteFileRegion(region);
in.progress(region.transferred() - transferred);
in.remove();
} else {
in.remove(new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg)));
}
}
}
/**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
return writeFileRegion(in, (FileRegion) msg);
} else {
// Should never reach here.
throw new Error();
}
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
}
if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Returns the default {@link SizeEstimator} to use.
*
* @return The default {@link SizeEstimator} to use.
*/
static SizeEstimator defaultEstimator() {
return (written, before, after) -> {
if (written instanceof FileRegion) {
return ((FileRegion) written).count();
}
return before > after ? before - after : 0;
};
}
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ViewMessageRequestHeader requestHeader =
(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
final SelectMappedBufferResult selectMappedBufferResult =
this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
if (selectMappedBufferResult != null) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
try {
FileRegion fileRegion =
new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("Transfer one message from page cache failed, ", future.cause());
}
}
});
} catch (Throwable e) {
log.error("", e);
selectMappedBufferResult.release();
}
return null;
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
}
return response;
}