下面列出了怎么用io.netty.util.ByteProcessor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testForward() {
final ByteBuf buf =
Unpooled.copiedBuffer("abc\r\n\ndef\r\rghi\n\njkl\0\0mno \t\tx", CharsetUtil.ISO_8859_1);
final int length = buf.readableBytes();
assertEquals(3, buf.forEachByte(0, length, ByteProcessor.FIND_CRLF));
assertEquals(6, buf.forEachByte(3, length - 3, ByteProcessor.FIND_NON_CRLF));
assertEquals(9, buf.forEachByte(6, length - 6, ByteProcessor.FIND_CR));
assertEquals(11, buf.forEachByte(9, length - 9, ByteProcessor.FIND_NON_CR));
assertEquals(14, buf.forEachByte(11, length - 11, ByteProcessor.FIND_LF));
assertEquals(16, buf.forEachByte(14, length - 14, ByteProcessor.FIND_NON_LF));
assertEquals(19, buf.forEachByte(16, length - 16, ByteProcessor.FIND_NUL));
assertEquals(21, buf.forEachByte(19, length - 19, ByteProcessor.FIND_NON_NUL));
assertEquals(24, buf.forEachByte(19, length - 19, ByteProcessor.FIND_ASCII_SPACE));
assertEquals(24, buf.forEachByte(21, length - 21, ByteProcessor.FIND_LINEAR_WHITESPACE));
assertEquals(28, buf.forEachByte(24, length - 24, ByteProcessor.FIND_NON_LINEAR_WHITESPACE));
assertEquals(-1, buf.forEachByte(28, length - 28, ByteProcessor.FIND_LINEAR_WHITESPACE));
buf.release();
}
@Test
public void testBackward() {
final ByteBuf buf =
Unpooled.copiedBuffer("abc\r\n\ndef\r\rghi\n\njkl\0\0mno \t\tx", CharsetUtil.ISO_8859_1);
final int length = buf.readableBytes();
assertEquals(27, buf.forEachByteDesc(0, length, ByteProcessor.FIND_LINEAR_WHITESPACE));
assertEquals(25, buf.forEachByteDesc(0, length, ByteProcessor.FIND_ASCII_SPACE));
assertEquals(23, buf.forEachByteDesc(0, 28, ByteProcessor.FIND_NON_LINEAR_WHITESPACE));
assertEquals(20, buf.forEachByteDesc(0, 24, ByteProcessor.FIND_NUL));
assertEquals(18, buf.forEachByteDesc(0, 21, ByteProcessor.FIND_NON_NUL));
assertEquals(15, buf.forEachByteDesc(0, 19, ByteProcessor.FIND_LF));
assertEquals(13, buf.forEachByteDesc(0, 16, ByteProcessor.FIND_NON_LF));
assertEquals(10, buf.forEachByteDesc(0, 14, ByteProcessor.FIND_CR));
assertEquals(8, buf.forEachByteDesc(0, 11, ByteProcessor.FIND_NON_CR));
assertEquals(5, buf.forEachByteDesc(0, 9, ByteProcessor.FIND_CRLF));
assertEquals(2, buf.forEachByteDesc(0, 6, ByteProcessor.FIND_NON_CRLF));
assertEquals(-1, buf.forEachByteDesc(0, 3, ByteProcessor.FIND_CRLF));
buf.release();
}
@Test
public void testForEachByte() {
buffer.clear();
for (int i = 0; i < CAPACITY; i ++) {
buffer.writeByte(i + 1);
}
final AtomicInteger lastIndex = new AtomicInteger();
buffer.setIndex(CAPACITY / 4, CAPACITY * 3 / 4);
assertThat(buffer.forEachByte(new ByteProcessor() {
int i = CAPACITY / 4;
@Override
public boolean process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
lastIndex.set(i);
i ++;
return true;
}
}), is(-1));
assertThat(lastIndex.get(), is(CAPACITY * 3 / 4 - 1));
}
@Test
public void testForEachByteAbort() {
buffer.clear();
for (int i = 0; i < CAPACITY; i ++) {
buffer.writeByte(i + 1);
}
final int stop = CAPACITY / 2;
assertThat(buffer.forEachByte(CAPACITY / 3, CAPACITY / 3, new ByteProcessor() {
int i = CAPACITY / 3;
@Override
public boolean process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
if (i == stop) {
return false;
}
i++;
return true;
}
}), is(stop));
}
@Test
public void testForEachByteDesc() {
buffer.clear();
for (int i = 0; i < CAPACITY; i ++) {
buffer.writeByte(i + 1);
}
final AtomicInteger lastIndex = new AtomicInteger();
assertThat(buffer.forEachByteDesc(CAPACITY / 4, CAPACITY * 2 / 4, new ByteProcessor() {
int i = CAPACITY * 3 / 4 - 1;
@Override
public boolean process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
lastIndex.set(i);
i --;
return true;
}
}), is(-1));
assertThat(lastIndex.get(), is(CAPACITY / 4));
}
@Test
public void testForEachByteDesc2() {
byte[] expected = {1, 2, 3, 4};
ByteBuf buf = newBuffer(expected.length);
try {
buf.writeBytes(expected);
final byte[] bytes = new byte[expected.length];
int i = buf.forEachByteDesc(new ByteProcessor() {
private int index = bytes.length - 1;
@Override
public boolean process(byte value) throws Exception {
bytes[index--] = value;
return true;
}
});
assertEquals(-1, i);
assertArrayEquals(expected, bytes);
} finally {
buf.release();
}
}
@Test
public void testForEachByte2() {
byte[] expected = {1, 2, 3, 4};
ByteBuf buf = newBuffer(expected.length);
try {
buf.writeBytes(expected);
final byte[] bytes = new byte[expected.length];
int i = buf.forEachByte(new ByteProcessor() {
private int index;
@Override
public boolean process(byte value) throws Exception {
bytes[index++] = value;
return true;
}
});
assertEquals(-1, i);
assertArrayEquals(expected, bytes);
} finally {
buf.release();
}
}
@Override
public int forEachByte(int index, int length, ByteProcessor processor) {
final int writerIndex = buffer.writerIndex();
if (index >= writerIndex) {
throw REPLAY;
}
if (index <= writerIndex - length) {
return buffer.forEachByte(index, length, processor);
}
int ret = buffer.forEachByte(index, writerIndex - index, processor);
if (ret < 0) {
throw REPLAY;
} else {
return ret;
}
}
private static ByteBuf readLine(ByteBuf in) {
if (!in.isReadable(RedisConstants.EOL_LENGTH)) {
return null;
}
final int lfIndex = in.forEachByte(ByteProcessor.FIND_LF);
if (lfIndex < 0) {
return null;
}
ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1); // `-1` is for CR
readEndOfLine(in); // validate CR LF
return data;
}
@Override
public int forEachByte(int index, int length, ByteProcessor processor) {
checkIndex0(index, length);
int ret = unwrap().forEachByte(idx(index), length, processor);
if (ret >= adjustment) {
return ret - adjustment;
} else {
return -1;
}
}
@Override
public int forEachByteDesc(int index, int length, ByteProcessor processor) {
checkIndex0(index, length);
int ret = unwrap().forEachByteDesc(idx(index), length, processor);
if (ret >= adjustment) {
return ret - adjustment;
} else {
return -1;
}
}
@Override
public int forEachByte(int index, int length, ByteProcessor processor) {
checkIndex0(index, length);
int ret = unwrap().forEachByte(idx(index), length, processor);
if (ret < adjustment) {
return -1;
}
return ret - adjustment;
}
@Override
public int forEachByteDesc(int index, int length, ByteProcessor processor) {
checkIndex0(index, length);
int ret = unwrap().forEachByteDesc(idx(index), length, processor);
if (ret < adjustment) {
return -1;
}
return ret - adjustment;
}
@Override
public int forEachByte(ByteProcessor processor) {
ensureAccessible();
try {
return forEachByteAsc0(readerIndex, writerIndex, processor);
} catch (Exception e) {
PlatformDependent.throwException(e);
return -1;
}
}
@Override
public int forEachByte(int index, int length, ByteProcessor processor) {
checkIndex(index, length);
try {
return forEachByteAsc0(index, index + length, processor);
} catch (Exception e) {
PlatformDependent.throwException(e);
return -1;
}
}
private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
for (; start < end; ++start) {
if (!processor.process(_getByte(start))) {
return start;
}
}
return -1;
}
@Override
public int forEachByteDesc(ByteProcessor processor) {
ensureAccessible();
try {
return forEachByteDesc0(writerIndex - 1, readerIndex, processor);
} catch (Exception e) {
PlatformDependent.throwException(e);
return -1;
}
}
@Override
public int forEachByteDesc(int index, int length, ByteProcessor processor) {
checkIndex(index, length);
try {
return forEachByteDesc0(index + length - 1, index, processor);
} catch (Exception e) {
PlatformDependent.throwException(e);
return -1;
}
}
private int forEachByteDesc0(int rStart, final int rEnd, ByteProcessor processor) throws Exception {
for (; rStart >= rEnd; --rStart) {
if (!processor.process(_getByte(rStart))) {
return rStart;
}
}
return -1;
}
private static int firstIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
fromIndex = Math.max(fromIndex, 0);
if (fromIndex >= toIndex || buffer.capacity() == 0) {
return -1;
}
return buffer.forEachByte(fromIndex, toIndex - fromIndex, new ByteProcessor.IndexOfProcessor(value));
}
private static int lastIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
fromIndex = Math.min(fromIndex, buffer.capacity());
if (fromIndex < 0 || buffer.capacity() == 0) {
return -1;
}
return buffer.forEachByteDesc(toIndex, fromIndex - toIndex, new ByteProcessor.IndexOfProcessor(value));
}
/**
* Returns the index in the buffer of the end of line found.
* Returns -1 if no end of line was found in the buffer.在找到的行末尾的缓冲区中返回索引。如果在缓冲区中找不到行尾,返回-1。
*/
private int findEndOfLine(final ByteBuf buffer) {
// 读取的字节数
int totalLength = buffer.readableBytes();
int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
if (i >= 0) {
offset = 0;
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
} else {
offset = totalLength;
}
return i;
}
@Override
public int forEachByte(ByteProcessor processor) {
int ret = buffer.forEachByte(processor);
if (ret < 0) {
throw REPLAY;
} else {
return ret;
}
}
@Override
public int forEachByteDesc(ByteProcessor processor) {
if (terminated) {
return buffer.forEachByteDesc(processor);
} else {
throw reject();
}
}
@Override
public int forEachByteDesc(int index, int length, ByteProcessor processor) {
if (index + length > buffer.writerIndex()) {
throw REPLAY;
}
return buffer.forEachByteDesc(index, length, processor);
}
/**
* 查找换行符
*/
@Test
public void findByteTest() {
ByteBuf byteBuf = Unpooled.copiedBuffer("abc\r", CharsetUtil.UTF_8);
int index = byteBuf.forEachByte(ByteProcessor.FIND_CR);
System.out.println(index);
}
private static int findCrLfCrLf(ByteBuf buffer) {
int totalLength = buffer.readableBytes();
int readerIndex = buffer.readerIndex();
int i = buffer.forEachByte(readerIndex, totalLength, ByteProcessor.FIND_LF);
if (i > 0 && buffer.getByte(i - 1) == '\r') {
int more = readerIndex + totalLength - i;
if (more > 1 && buffer.getByte(i + 1) == '\r' && buffer.getByte(i + 2) == '\n') {
return i - 1;
}
}
return -1;
}
private static int findDotAtBeginningOfLine(ByteBuf buffer, int startAt, byte[] previousBytes) {
int length = buffer.readableBytes();
if (previousBytes[0] == CR && previousBytes[1] == LF && buffer.getByte(startAt) == DOT) {
return startAt;
}
if (previousBytes[1] == CR && length >= 2 && buffer.getByte(startAt) == LF && buffer.getByte(startAt + 1) == DOT) {
return startAt + 1;
}
int i = startAt;
while (++i < length) {
i = buffer.forEachByte(i, length - i, ByteProcessor.FIND_LF);
if (i == -1) {
return -1;
}
if (buffer.getByte(i - 1) == CR) {
if (i + 1 < length && buffer.getByte(i + 1) == DOT) {
return i + 1;
} else {
continue;
}
}
}
return -1;
}
@Override
public int forEachByteDesc(final int n, final int n2, final ByteProcessor byteProcessor) {
if ((n + n2) > this.buffer.writerIndex()) {
throw EOF;
}
return this.buffer.forEachByteDesc(n, n2, byteProcessor);
}
@Nullable
private static HttpHeaders parseGrpcWebTrailers(ByteBuf buf) {
final HttpHeadersBuilder trailers = HttpHeaders.builder();
while (buf.readableBytes() > 0) {
int start = buf.forEachByte(ByteProcessor.FIND_NON_LINEAR_WHITESPACE);
if (start == -1) {
return null;
}
int endExclusive;
if (buf.getByte(start) == ':') {
// We need to skip the pseudoheader colon when searching for the separator.
buf.skipBytes(1);
endExclusive = buf.forEachByte(FIND_COLON);
buf.readerIndex(start);
} else {
endExclusive = buf.forEachByte(FIND_COLON);
}
if (endExclusive == -1) {
return null;
}
final CharSequence name = buf.readCharSequence(endExclusive - start, StandardCharsets.UTF_8);
buf.readerIndex(endExclusive + 1);
start = buf.forEachByte(ByteProcessor.FIND_NON_LINEAR_WHITESPACE);
buf.readerIndex(start);
endExclusive = buf.forEachByte(ByteProcessor.FIND_CRLF);
final CharSequence value = buf.readCharSequence(endExclusive - start, StandardCharsets.UTF_8);
trailers.add(name, value.toString());
start = buf.forEachByte(ByteProcessor.FIND_NON_CRLF);
if (start != -1) {
buf.readerIndex(start);
} else {
// Nothing but CRLF remaining, we're done.
buf.skipBytes(buf.readableBytes());
}
}
return trailers.build();
}