下面列出了java.nio.ByteBuffer#compact ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testNIOReadWithDirectBuffer() throws Exception {
FileRef fileRef = getLocalFileRef(testDir, RATE_LIMIT);
long fileSize = Files.size(Paths.get(FileRefTestUtil.getSourceFilePath(testDir)));
long remainingFileSize = fileSize;
try (ReadableByteChannel is = Mockito.spy(fileRef.createInputStream(context, ReadableByteChannel.class))) {
AtomicInteger bytesWishToBeRead = new AtomicInteger(-1);
AtomicBoolean isRateLimiterAcquired = new AtomicBoolean(false);
intercept(is, bytesWishToBeRead, isRateLimiterAcquired);
Assert.assertEquals(fileSize, getRemainingStreamSize(is));
ByteBuffer b = ByteBuffer.allocateDirect(10);
int bytesRead;
int freeSpaceInBuffer = b.remaining();
while((bytesRead = is.read(b)) > 0) {
remainingFileSize -= bytesRead;
checkState(is, remainingFileSize, freeSpaceInBuffer, bytesWishToBeRead, isRateLimiterAcquired);
bytesWishToBeRead.set(-1);
isRateLimiterAcquired.set(false);
b.compact();
freeSpaceInBuffer = b.remaining();
}
Assert.assertFalse(isRateLimiterAcquired.get());
}
}
@Override
public boolean doReadHandler(Connection conn) throws IOException {
RedisMessage redisMessage = new RedisMessage(conn.getReadDataBuffer(),
conn.getLastMessagePos());
while (redisMessage.position() < redisMessage.limit()) {
redisMessage.replay(null);
int result = processMultibulkBuffer(redisMessage);
conn.getReadDataBuffer().position(redisMessage.position());
conn.setLastMessagePos(redisMessage.position());
if (result == REDIS_OK) {
processCommand(conn,redisMessage);
}
if (redisMessage.replay() != null) {
ByteBuffer writeBuf = ByteBuffer.wrap(redisMessage.replay().getBytes());
writeBuf.compact();
conn.addWriteQueue(writeBuf);
conn.enableWrite(true);
}
}
return true;
}
@Test
public void read_using_direct_buffer() throws Exception {
final BufferSupplier decoder = new GzipDecodingBufferSupplier(new ByteArrayBufferSupplier(compressed_data));
final ByteBuffer buf = ByteBuffer.allocateDirect(512);
final ByteArrayOutputStream decoded_data = new ByteArrayOutputStream(plaintext.length);
while (!decoder.atEof()) {
decoder.load(buf);
assertTrue("decoder must entirely fill buffer", decoder.atEof() || !buf.hasRemaining());
buf.flip();
{
byte[] tmpbuf = new byte[buf.remaining()];
buf.get(tmpbuf);
decoded_data.write(tmpbuf);
}
buf.compact();
}
decoded_data.close();
assertArrayEquals(plaintext, decoded_data.toByteArray());
}
@Override
public void completed(Integer receivedCount, LongClientSession session) {
//server close socket channel or network issue
if(receivedCount < 0){
session.close();
return;
}
try {
ByteBuffer byteBuffer = session.getByteBuffer();
if(receivedCount > 0){
byteBuffer.flip();
byte[] receiveData = new byte[byteBuffer.limit()];
byteBuffer.get(receiveData);
session.write(receiveData);
byteBuffer.compact();
//chech whether one receive from server is done
session.checkOneReceiveDone(receivedCount, receiveData);
}
} finally {
session.receive();
}
}
@Override
public void completed(Integer receivedCount, ServerSession session) {
if(receivedCount < 0){
session.close();
return;
}
try {
ByteBuffer byteBuffer = session.getByteBuffer();
if (receivedCount > 0) {
byteBuffer.flip();
byte[] receiveData = new byte[byteBuffer.limit()];
byteBuffer.get(receiveData);
session.write(receiveData);
byteBuffer.compact();
//chech whether one client request is done
session.checkOneReceiveDone(receivedCount, receiveData);
}
} finally {
session.receive();
}
}
protected void flushBufferBlocking(final ByteBuffer buffer)
throws IOException {
buffer.flip();
try {
do {
writeBuffer(buffer);
} while (buffer.hasRemaining());
} finally {
if (buffer.hasRemaining()) {
buffer.compact();
}
else {
buffer.clear();
}
}
}
public static void copy(InputStream aIS, File aTargetFile) throws IOException
{
aTargetFile.getParentFile().mkdirs();
try (
ReadableByteChannel in = newChannel(aIS);
WritableByteChannel out = newChannel(new FileOutputStream(aTargetFile))
) {
final ByteBuffer buffer = allocateDirect(8192);
while (in.read(buffer) != -1) {
buffer.flip();
out.write(buffer);
buffer.compact();
}
buffer.flip();
while (buffer.hasRemaining()) {
out.write(buffer);
}
}
}
private void handleWritable(Connection conn_) {
TCPConnection conn = (TCPConnection)conn_;
logger.log(Level.FINEST,"handleWritable():");
ByteBuffer net_out_buffer = conn.connection_buffers.netOutBuffer();
//int bytes = net_out_buffer.position();
//net_out_buffer.rewind();
//net_out_buffer.limit(bytes);
net_out_buffer.flip();
//logger.log(Level.FINEST," :bytes= " + bytes);
int count;
try {
count = conn.channel.write(net_out_buffer);
if(count<0) {
closeConnection(conn);
return;
}
//conn.consumeNetOutBuffer(count);
net_out_buffer.compact();
conn.processAppOutBuffer();
if(!conn.hasNetOutput())
conn.channel.register(selector, SelectionKey.OP_READ, conn);
} catch(java.io.IOException ex) {
closeConnection(conn);
return;
}
}
protected void flushBufferBlocking(final ByteBuffer buffer)
throws IOException {
buffer.position((int)(this.addrPosition - this.baseAddress));
buffer.flip();
try {
do {
writeBuffer(buffer);
} while (buffer.hasRemaining());
} finally {
if (buffer.hasRemaining()) {
buffer.compact();
}
else {
buffer.clear();
}
resetBufferPositions();
}
}
/**
* Channel copy method 1. This method copies data from the src channel and
* writes it to the dest channel until EOF on src. This implementation makes
* use of compact( ) on the temp buffer to pack down the data if the buffer
* wasn't fully drained. This may result in data copying, but minimizes system
* calls. It also requires a cleanup loop to make sure all the data gets sent.
* <br>
* Source: Java NIO, page 60
*
* @param aSrc
* Source channel. May not be <code>null</code>. Is not closed after
* the operation.
* @param aDest
* Destination channel. May not be <code>null</code>. Is not closed
* after the operation.
* @return The number of bytes written.
*/
@Nonnegative
private static long _channelCopy1 (@Nonnull @WillNotClose final ReadableByteChannel aSrc,
@Nonnull @WillNotClose final WritableByteChannel aDest) throws IOException
{
long nBytesWritten = 0;
final ByteBuffer aBuffer = ByteBuffer.allocateDirect (16 * 1024);
while (aSrc.read (aBuffer) != -1)
{
// Prepare the buffer to be drained
aBuffer.flip ();
// Write to the channel; may block
nBytesWritten += aDest.write (aBuffer);
// If partial transfer, shift remainder down
// If buffer is empty, same as doing clear()
aBuffer.compact ();
}
// EOF will leave buffer in fill state
aBuffer.flip ();
// Make sure that the buffer is fully drained
while (aBuffer.hasRemaining ())
nBytesWritten += aDest.write (aBuffer);
return nBytesWritten;
}
/**
* Pushes back the given data. This should only be used by transfer coding handlers that have read past
* the end of the request when handling pipelined requests
*
* @param unget The buffer to push back
*/
public void ungetRequestBytes(final PooledByteBuffer unget) {
if (getExtraBytes() == null) {
setExtraBytes(unget);
} else {
PooledByteBuffer eb = getExtraBytes();
ByteBuffer buf = eb.getBuffer();
final ByteBuffer ugBuffer = unget.getBuffer();
if (ugBuffer.limit() - ugBuffer.remaining() > buf.remaining()) {
//stuff the existing data after the data we are ungetting
ugBuffer.compact();
ugBuffer.put(buf);
ugBuffer.flip();
eb.close();
setExtraBytes(unget);
} else {
//TODO: this is horrible, but should not happen often
final byte[] data = new byte[ugBuffer.remaining() + buf.remaining()];
int first = ugBuffer.remaining();
ugBuffer.get(data, 0, ugBuffer.remaining());
buf.get(data, first, buf.remaining());
eb.close();
unget.close();
final ByteBuffer newBuffer = ByteBuffer.wrap(data);
setExtraBytes(new ImmediatePooledByteBuffer(newBuffer));
}
}
}
private static void fastChannelCopy(final ReadableByteChannel src, final WritableByteChannel dest)
throws IOException {
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
while (src.read(buffer) != -1) {
buffer.flip();
dest.write(buffer);
buffer.compact();
}
buffer.flip();
while (buffer.hasRemaining()) {
dest.write(buffer);
}
}
public void run() {
try {
this.buffer.clear();
while (connection.isConnected()) {
ByteBuffer in = buffer.checkCapacity(1024).nioBuffer();//如果剩余空间不够每次增加1k
if (!read(connection.getChannel(), in)) break;
in.flip();
decodePacket(in);
in.compact();
}
} finally {
logger.w("read an error, do reconnect!!!");
connection.reconnect();
}
}
/**
* Transfer as much as possible from source to dest buffer.
*
* @param aSrcBuffer
* Source buffer. May not be <code>null</code>.
* @param aDstBuffer
* Destination buffer. May not be <code>null</code>.
* @param bNeedsFlip
* whether or not to flip src
* @return The amount of data transferred. Always ≥ 0.
*/
@Nonnegative
public static int transfer (@Nonnull final ByteBuffer aSrcBuffer,
@Nonnull final ByteBuffer aDstBuffer,
final boolean bNeedsFlip)
{
ValueEnforcer.notNull (aSrcBuffer, "SourceBuffer");
ValueEnforcer.notNull (aDstBuffer, "DestinationBuffer");
int nRead = 0;
if (bNeedsFlip)
{
if (aSrcBuffer.position () > 0)
{
aSrcBuffer.flip ();
nRead = _doTransfer (aSrcBuffer, aDstBuffer);
if (aSrcBuffer.hasRemaining ())
aSrcBuffer.compact ();
else
aSrcBuffer.clear ();
}
}
else
{
if (aSrcBuffer.hasRemaining ())
nRead = _doTransfer (aSrcBuffer, aDstBuffer);
}
return nRead;
}
@Override
protected void getAtLeastBytes(ByteBuffer inputBuffer, int len, ReadMode mode) throws IOException {
if(mode == ReadMode.ONLY_WHEN_EMPTY && inputBuffer.remaining() >= len) return;
SocketChannel sc = socketChannel.get();
if(sc == null || !isDeviceConnected()) throw new IOException("Socket closed during read");
do {
inputBuffer.compact();
int actual = sc.read(inputBuffer);
inputBuffer.flip();
if (actual <= 0) throw new IOException("Socket probably closed, read return was 0 or less");
} while(inputBuffer.remaining()<len);
}
private ByteBuffer compactReadBuffer(ByteBuffer buffer, int offset) {
if(buffer == null) {
return null;
}
buffer.limit(buffer.position());
buffer.position(offset);
buffer = buffer.compact();
readBufferOffset = 0;
return buffer;
}
private static HandshakeHolder doHandshakeUnwrap(final SocketChannel socketChannel, final SSLEngine sslEngine,
ByteBuffer peerAppData, ByteBuffer peerNetData, final int appBufferSize) throws IOException {
if (socketChannel == null || sslEngine == null || peerAppData == null || peerNetData == null || appBufferSize < 0) {
return new HandshakeHolder(peerAppData, peerNetData, false);
}
if (socketChannel.read(peerNetData) < 0) {
if (sslEngine.isInboundDone() && sslEngine.isOutboundDone()) {
return new HandshakeHolder(peerAppData, peerNetData, false);
}
try {
sslEngine.closeInbound();
} catch (SSLException e) {
s_logger.warn("This SSL engine was forced to close inbound due to end of stream.", e);
}
sslEngine.closeOutbound();
// After closeOutbound the engine will be set to WRAP state,
// in order to try to send a close message to the client.
return new HandshakeHolder(peerAppData, peerNetData, true);
}
peerNetData.flip();
SSLEngineResult result = null;
try {
result = sslEngine.unwrap(peerNetData, peerAppData);
peerNetData.compact();
} catch (final SSLException sslException) {
s_logger.error(String.format("SSL error caught during unwrap data: %s, for local address=%s, remote address=%s. The client may have invalid ca-certificates.",
sslException.getMessage(), socketChannel.getLocalAddress(), socketChannel.getRemoteAddress()));
sslEngine.closeOutbound();
return new HandshakeHolder(peerAppData, peerNetData, false);
}
if (result == null) {
return new HandshakeHolder(peerAppData, peerNetData, false);
}
switch (result.getStatus()) {
case OK:
break;
case BUFFER_OVERFLOW:
// Will occur when peerAppData's capacity is smaller than the data derived from peerNetData's unwrap.
peerAppData = enlargeBuffer(peerAppData, appBufferSize);
break;
case BUFFER_UNDERFLOW:
// Will occur either when no data was read from the peer or when the peerNetData buffer
// was too small to hold all peer's data.
peerNetData = handleBufferUnderflow(sslEngine, peerNetData);
break;
case CLOSED:
if (sslEngine.isOutboundDone()) {
return new HandshakeHolder(peerAppData, peerNetData, false);
} else {
sslEngine.closeOutbound();
}
break;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
return new HandshakeHolder(peerAppData, peerNetData, true);
}
public static void run() throws Exception {
SSLEngine[][] enginesToTest = getSSLEnginesToTest();
for (SSLEngine[] engineToTest : enginesToTest) {
SSLEngine clientSSLEngine = engineToTest[0];
SSLEngine serverSSLEngine = engineToTest[1];
// SSLEngine code based on RedhandshakeFinished.java
boolean dataDone = false;
ByteBuffer clientOut = null;
ByteBuffer clientIn = null;
ByteBuffer serverOut = null;
ByteBuffer serverIn = null;
ByteBuffer cTOs;
ByteBuffer sTOc;
SSLSession session = clientSSLEngine.getSession();
int appBufferMax = session.getApplicationBufferSize();
int netBufferMax = session.getPacketBufferSize();
clientIn = ByteBuffer.allocate(appBufferMax + 50);
serverIn = ByteBuffer.allocate(appBufferMax + 50);
cTOs = ByteBuffer.allocateDirect(netBufferMax);
sTOc = ByteBuffer.allocateDirect(netBufferMax);
clientOut = ByteBuffer.wrap(
"Hi Server, I'm Client".getBytes());
serverOut = ByteBuffer.wrap(
"Hello Client, I'm Server".getBytes());
SSLEngineResult clientResult;
SSLEngineResult serverResult;
while (!dataDone) {
clientResult = clientSSLEngine.wrap(clientOut, cTOs);
runDelegatedTasks(clientResult, clientSSLEngine);
serverResult = serverSSLEngine.wrap(serverOut, sTOc);
runDelegatedTasks(serverResult, serverSSLEngine);
cTOs.flip();
sTOc.flip();
if (enableDebug) {
System.out.println("Client -> Network");
printTlsNetworkPacket("", cTOs);
System.out.println("");
System.out.println("Server -> Network");
printTlsNetworkPacket("", sTOc);
System.out.println("");
}
clientResult = clientSSLEngine.unwrap(sTOc, clientIn);
runDelegatedTasks(clientResult, clientSSLEngine);
serverResult = serverSSLEngine.unwrap(cTOs, serverIn);
runDelegatedTasks(serverResult, serverSSLEngine);
cTOs.compact();
sTOc.compact();
if (!dataDone &&
(clientOut.limit() == serverIn.position()) &&
(serverOut.limit() == clientIn.position())) {
checkTransfer(serverOut, clientIn);
checkTransfer(clientOut, serverIn);
dataDone = true;
}
}
}
}
private void doUnWrap() {
try {
ByteBuffer netBuffer = netReadBuffer.buffer();
ByteBuffer appBuffer = appReadBuffer.buffer();
netBuffer.flip();
SSLEngineResult result = sslEngine.unwrap(netBuffer, appBuffer);
boolean closed = false;
while (!closed && result.getStatus() != SSLEngineResult.Status.OK) {
switch (result.getStatus()) {
case BUFFER_OVERFLOW:
logger.warn("BUFFER_OVERFLOW error");
break;
case BUFFER_UNDERFLOW:
if (netBuffer.limit() == netBuffer.capacity()) {
logger.warn("BUFFER_UNDERFLOW error");
} else {
if (logger.isDebugEnabled()) {
logger.debug("BUFFER_UNDERFLOW,continue read:" + netBuffer);
}
if (netBuffer.position() > 0) {
netBuffer.compact();
} else {
netBuffer.position(netBuffer.limit());
netBuffer.limit(netBuffer.capacity());
}
}
return;
case CLOSED:
logger.warn("doUnWrap Result:" + result.getStatus());
closed = true;
break;
default:
logger.warn("doUnWrap Result:" + result.getStatus());
}
result = sslEngine.unwrap(netBuffer, appBuffer);
}
netBuffer.compact();
} catch (SSLException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws IOException, GeneralSecurityException {
// initialize the SSLContext, a configuration holder, reusable object
SSLContext sslContext = ContextFactory.authenticatedContext("TLSv1.2");
/*
* Set the SSLContext factory with a lambda expression. In this case we reject the connection in all cases
* except when the supplied domain matches exacting, in which case we just return our default context. A real
* implementation would have more than one context to return according to the supplied name.
*/
SniSslContextFactory exampleSslContextFactory =
(Optional<SNIServerName> sniServerName) -> {
if (!sniServerName.isPresent()) {
return Optional.empty();
}
SNIServerName name = sniServerName.get();
if (!(name instanceof SNIHostName)) {
return Optional.empty();
}
SNIHostName hostName = (SNIHostName) name;
if (hostName.getAsciiName().equals("domain.com")) {
return Optional.of(sslContext);
} else {
return Optional.empty();
}
};
// connect server socket channel normally
try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
serverSocket.socket().bind(new InetSocketAddress(10000));
// accept raw connections normally
System.out.println("Waiting for connection...");
try (SocketChannel rawChannel = serverSocket.accept()) {
// create TlsChannel builder, combining the raw channel and the defined SSLContext factory
ServerTlsChannel.Builder builder =
ServerTlsChannel.newBuilder(rawChannel, exampleSslContextFactory);
// instantiate TlsChannel
try (TlsChannel tlsChannel = builder.build()) {
// write to stdout all data sent by the client
ByteBuffer res = ByteBuffer.allocate(10000);
while (tlsChannel.read(res) != -1) {
res.flip();
System.out.print(utf8.decode(res).toString());
res.compact();
}
}
}
}
}