下面列出了io.netty.channel.embedded.EmbeddedChannel#write ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testFlush() {
Lz4FrameEncoder encoder = new Lz4FrameEncoder();
EmbeddedChannel channel = new EmbeddedChannel(encoder);
int size = 27;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(size, size);
buf.writerIndex(size);
Assert.assertEquals(0, encoder.getBackingBuffer().readableBytes());
channel.write(buf);
Assert.assertTrue(channel.outboundMessages().isEmpty());
Assert.assertEquals(size, encoder.getBackingBuffer().readableBytes());
channel.flush();
Assert.assertTrue(channel.finish());
Assert.assertTrue(channel.releaseOutbound());
Assert.assertFalse(channel.releaseInbound());
}
@Test
public void testAllocatingAroundBlockSize() {
int blockSize = 100;
Lz4FrameEncoder encoder = newEncoder(blockSize, Lz4FrameEncoder.DEFAULT_MAX_ENCODE_SIZE);
EmbeddedChannel channel = new EmbeddedChannel(encoder);
int size = blockSize - 1;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(size, size);
buf.writerIndex(size);
Assert.assertEquals(0, encoder.getBackingBuffer().readableBytes());
channel.write(buf);
Assert.assertEquals(size, encoder.getBackingBuffer().readableBytes());
int nextSize = size - 1;
buf = ByteBufAllocator.DEFAULT.buffer(nextSize, nextSize);
buf.writerIndex(nextSize);
channel.write(buf);
Assert.assertEquals(size + nextSize - blockSize, encoder.getBackingBuffer().readableBytes());
channel.flush();
Assert.assertEquals(0, encoder.getBackingBuffer().readableBytes());
Assert.assertTrue(channel.finish());
Assert.assertTrue(channel.releaseOutbound());
Assert.assertFalse(channel.releaseInbound());
}
@Test
public void shouldLogChannelWritabilityChanged() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler());
// this is used to switch the channel to become unwritable
channel.config().setWriteBufferLowWaterMark(5);
channel.config().setWriteBufferHighWaterMark(10);
channel.write("hello", channel.newPromise());
verify(appender).doAppend(argThat(new RegexLogMatcher(".+WRITABILITY CHANGED$")));
}
@Test
public void testWritability() {
final StringBuilder buf = new StringBuilder();
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
buf.append(ctx.channel().isWritable());
buf.append(' ');
}
});
ch.config().setWriteBufferLowWaterMark(128 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD);
ch.config().setWriteBufferHighWaterMark(256 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD);
ch.write(buffer().writeZero(128));
// Ensure exceeding the low watermark does not make channel unwritable.
ch.write(buffer().writeZero(2));
assertThat(buf.toString(), is(""));
ch.unsafe().outboundBuffer().addFlush();
// Ensure exceeding the high watermark makes channel unwritable.
ch.write(buffer().writeZero(127));
assertThat(buf.toString(), is("false "));
// Ensure going down to the low watermark makes channel writable again by flushing the first write.
assertThat(ch.unsafe().outboundBuffer().remove(), is(true));
assertThat(ch.unsafe().outboundBuffer().remove(), is(true));
assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(),
is(127L + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD));
assertThat(buf.toString(), is("false true "));
safeClose(ch);
}
@Test
public void testMixedWritability() {
final StringBuilder buf = new StringBuilder();
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
buf.append(ctx.channel().isWritable());
buf.append(' ');
}
});
ch.config().setWriteBufferLowWaterMark(128);
ch.config().setWriteBufferHighWaterMark(256);
ChannelOutboundBuffer cob = ch.unsafe().outboundBuffer();
// Trigger channelWritabilityChanged() by writing a lot.
ch.write(buffer().writeZero(257));
assertThat(buf.toString(), is("false "));
// Ensure that setting a user-defined writability flag to false does not trigger channelWritabilityChanged()
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChanged()
// because of the user-defined writability flag.
ch.flush();
assertThat(cob.totalPendingWriteBytes(), is(0L));
assertThat(buf.toString(), is("false "));
// Ensure that setting the user-defined writability flag to true triggers channelWritabilityChanged()
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
safeClose(ch);
}
@Test
public void channelWritabilityChanged() {
EmbeddedChannel channel = new EmbeddedChannel(handler);
channel.config().setWriteBufferHighWaterMark(32768 + 200);
channel.config().setWriteBufferLowWaterMark(32768);
int count = 0;
while(count < 32768 + 200) {
channel.write(Unpooled.copiedBuffer("hello world!".getBytes()));
count += "hello world!".getBytes().length;
}
Assert.assertFalse(channel.isWritable());
verify(session).setWritableState(Session.SessionWritableState.UNWRITABLE);
}
@Test
public void testWritability() {
final StringBuilder buf = new StringBuilder();
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
buf.append(ctx.channel().isWritable());
buf.append(' ');
}
});
ch.config().setWriteBufferLowWaterMark(128);
ch.config().setWriteBufferHighWaterMark(256);
// Ensure exceeding the low watermark does not make channel unwritable.
ch.write(buffer().writeZero(128));
assertThat(buf.toString(), is(""));
ch.unsafe().outboundBuffer().addFlush();
// Ensure exceeding the high watermark makes channel unwritable.
ch.write(buffer().writeZero(128));
assertThat(buf.toString(), is("false "));
// Ensure going down to the low watermark makes channel writable again by flushing the first write.
assertThat(ch.unsafe().outboundBuffer().remove(), is(true));
assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(), is(128L));
assertThat(buf.toString(), is("false true "));
safeClose(ch);
}
@Test
public void testMixedWritability() {
final StringBuilder buf = new StringBuilder();
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
buf.append(ctx.channel().isWritable());
buf.append(' ');
}
});
ch.config().setWriteBufferLowWaterMark(128);
ch.config().setWriteBufferHighWaterMark(256);
ChannelOutboundBuffer cob = ch.unsafe().outboundBuffer();
// Trigger channelWritabilityChanged() by writing a lot.
ch.write(buffer().writeZero(256));
assertThat(buf.toString(), is("false "));
// Ensure that setting a user-defined writability flag to false does not trigger channelWritabilityChanged()
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChannged()
// because of the user-defined writability flag.
ch.flush();
assertThat(cob.totalPendingWriteBytes(), is(0L));
assertThat(buf.toString(), is("false "));
// Ensure that setting the user-defined writability flag to true triggers channelWritabilityChanged()
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
safeClose(ch);
}