下面列出了io.netty.channel.ChannelId#io.netty.channel.ChannelConfig 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void setSoLingerChannelOption() throws IOException {
startServer();
Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();
// set SO_LINGER option
int soLinger = 123;
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker());
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
// verify SO_LINGER has been set
ChannelConfig config = transport.channel().config();
assertTrue(config instanceof SocketChannelConfig);
assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
final void readReadyFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearReadFilter0();
} else if (readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of ET we
// will not get notified again until we read everything from the socket
//
// It is possible the last fireChannelRead call could cause the user to call read() again, or if
// autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
// to false before every read operation to prevent re-entry into readReady() we will not read from
// the underlying OS again unless the user happens to call read again.
executeReadReadyRunnable(config);
}
}
final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn();
} else if (readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
//
// It is possible the last fireChannelRead call could cause the user to call read() again, or if
// autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
// to false before every read operation to prevent re-entry into epollInReady() we will not read from
// the underlying OS again unless the user happens to call read again.
executeEpollInReadyRunnable(config);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
@Test
public void setSoLingerChannelOption() throws IOException {
startServer();
Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
// set SO_LINGER option
int soLinger = 123;
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false);
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
// verify SO_LINGER has been set
ChannelConfig config = transport.channel().config();
assertTrue(config instanceof SocketChannelConfig);
assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger());
}
static InternalChannelz.SocketOptions getSocketOptions(Channel channel) {
ChannelConfig config = channel.config();
InternalChannelz.SocketOptions.Builder b = new InternalChannelz.SocketOptions.Builder();
// The API allows returning null but not sure if it can happen in practice.
// Let's be paranoid and do null checking just in case.
Integer lingerSeconds = config.getOption(SO_LINGER);
if (lingerSeconds != null) {
b.setSocketOptionLingerSeconds(lingerSeconds);
}
Integer timeoutMillis = config.getOption(SO_TIMEOUT);
if (timeoutMillis != null) {
// in java, SO_TIMEOUT only applies to receiving
b.setSocketOptionTimeoutMillis(timeoutMillis);
}
for (Entry<ChannelOption<?>, Object> opt : config.getOptions().entrySet()) {
ChannelOption<?> key = opt.getKey();
// Constants are pooled, so there should only be one instance of each constant
if (key.equals(SO_LINGER) || key.equals(SO_TIMEOUT)) {
continue;
}
Object value = opt.getValue();
// zpencer: Can a netty option be null?
b.addOption(key.name(), String.valueOf(value));
}
NativeSocketOptions nativeOptions
= NettySocketSupport.getNativeSocketOptions(channel);
if (nativeOptions != null) {
b.setTcpInfo(nativeOptions.tcpInfo); // may be null
for (Entry<String, String> entry : nativeOptions.otherInfo.entrySet()) {
b.addOption(entry.getKey(), entry.getValue());
}
}
return b.build();
}
@Override
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
RecvByteBufAllocator.ExtendedHandle.class);
}
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public void run() {
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
channel.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
channel.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
channel.read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
final void executeReadReadyRunnable(ChannelConfig config) {
if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
return;
}
readReadyRunnablePending = true;
eventLoop().execute(readReadyRunnable);
}
@Override
protected void compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(ChannelConfig config,
int soSndBuf) {
if (config instanceof KQueueChannelConfig) {
((KQueueChannelConfig) config).setMaxBytesPerGatheringWrite(soSndBuf);
}
}
final void executeEpollInReadyRunnable(ChannelConfig config) {
if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
return;
}
epollInReadyRunnablePending = true;
eventLoop().execute(epollInReadyRunnable);
}
@Override
protected void compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(ChannelConfig config,
int soSndBuf) {
if (config instanceof EpollChannelConfig) {
((EpollChannelConfig) config).setMaxBytesPerGatheringWrite(soSndBuf);
}
}
@BeforeMethod
public void setUp() throws Exception {
AmqpChannel channel = Mockito.mock(AmqpChannel.class);
ctx = Mockito.mock(ChannelHandlerContext.class);
Channel mockChannel = Mockito.mock(Channel.class);
Mockito.when(mockChannel.config()).thenReturn(Mockito.mock(ChannelConfig.class));
Mockito.when(ctx.channel()).thenReturn(mockChannel);
channelFlowManager = new ChannelFlowManager(channel, 2, 10);
argumentCaptor = ArgumentCaptor.forClass(ChannelFlow.class);
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();
ChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf data = allocHandle.allocate(config.getAllocator());
allocHandle.attemptedBytesRead(data.writableBytes());
boolean free = true;
try {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
int pos = nioData.position();
int read = ch.read(nioData);
if (read <= 0) {
return read;
}
allocHandle.lastBytesRead(nioData.position() - pos);
buf.add(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()));
free = false;
return 1;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
data.release();
}
}
}
@DataProvider(value = {
"GZIP",
"X_GZIP",
"DEFLATE",
"X_DEFLATE",
"CONTENT_ENCODING_THAT_DOES_NOT_REPRESENT_COMPRESSED_PAYLOAD",
"ENDPOINT_DOES_NOT_WANT_DECOMPRESS",
"NULL_ENDPOINT"
})
@Test
public void newContentDecoder_works_as_expected(NewContentDecoderScenario scenario) throws Exception {
// given
SmartHttpContentDecompressor decompressor = new SmartHttpContentDecompressor();
TestUtil.ChannelHandlerContextMocks mocks = TestUtil.mockChannelHandlerContext();
Whitebox.setInternalState(decompressor, "ctx", mocks.mockContext);
ChannelMetadata channelMetadata = new ChannelMetadata(false);
ChannelConfig channelConfigMock = mock(ChannelConfig.class);
doReturn(scenario.endpoint).when(mocks.mockHttpProcessingState).getEndpointForExecution();
doReturn(channelMetadata).when(mocks.mockChannel).metadata();
doReturn(channelConfigMock).when(mocks.mockChannel).config();
// when
EmbeddedChannel result = decompressor.newContentDecoder(scenario.contentEncoding);
// then
if (scenario.expectValidDecompressor) {
assertThat(result).isNotNull();
}
else {
assertThat(result).isNull();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
private Channel fakeChannel() {
Channel channel = mock(Channel.class);
ChannelConfig config = mock(ChannelConfig.class);
when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", randomPort()));
when(channel.config()).thenReturn(config);
when(config.isAutoRead()).thenReturn(false);
when(channel.writeAndFlush(any())).thenReturn(new DefaultChannelPromise(channel));
return channel;
}
@Override
public void run() {
ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
ctx.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
ctx.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
ctx.channel().read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
private static void runLineBasedFrameDecoder() {
TcpServer<String, String> transport = Netty4TcpServer.<String, String>create(
0,
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
int bufferSize = 1;
ChannelConfig config = channel.config();
config.setOption(ChannelOption.SO_RCVBUF, bufferSize);
config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize));
channel.pipeline().addFirst(
new LineBasedFrameDecoder(256),
new StringDecoder(CharsetUtil.UTF_8),
new StringEncoder(CharsetUtil.UTF_8));
}
});
ReactorTcpServer.create(transport).start(connection -> {
connection.log("input")
.observeComplete(v -> LOG.info("Connection input complete"))
.capacity(1)
.consume(line -> {
String response = "Hello " + line + "\n";
Streams.wrap(connection.writeWith(Streams.just(response))).consume();
});
return Streams.never();
});
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Request) {
ChannelConfig config = ctx.channel().config();
if (++outstanding >= high && config.isAutoRead()) {
config.setAutoRead(false);
}
}
ctx.fireChannelRead(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Response) {
ChannelConfig config = ctx.channel().config();
if (--outstanding <= low && !config.isAutoRead()) {
config.setAutoRead(true);
}
}
ctx.write(msg, promise);
}
private <T extends Channel> T configure(T channel) {
ChannelConfig channelConfig = channel.config();
if (nonNull(connectTimeout)) {
channelConfig.setConnectTimeoutMillis(connectTimeout);
}
if (nonNull(readTimeout)) {
channel.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler());
}
return channel;
}
static InternalChannelz.SocketOptions getSocketOptions(Channel channel) {
ChannelConfig config = channel.config();
InternalChannelz.SocketOptions.Builder b = new InternalChannelz.SocketOptions.Builder();
// The API allows returning null but not sure if it can happen in practice.
// Let's be paranoid and do null checking just in case.
Integer lingerSeconds = config.getOption(SO_LINGER);
if (lingerSeconds != null) {
b.setSocketOptionLingerSeconds(lingerSeconds);
}
Integer timeoutMillis = config.getOption(SO_TIMEOUT);
if (timeoutMillis != null) {
// in java, SO_TIMEOUT only applies to receiving
b.setSocketOptionTimeoutMillis(timeoutMillis);
}
for (Entry<ChannelOption<?>, Object> opt : config.getOptions().entrySet()) {
ChannelOption<?> key = opt.getKey();
// Constants are pooled, so there should only be one instance of each constant
if (key.equals(SO_LINGER) || key.equals(SO_TIMEOUT)) {
continue;
}
Object value = opt.getValue();
// zpencer: Can a netty option be null?
b.addOption(key.name(), String.valueOf(value));
}
NativeSocketOptions nativeOptions
= NettySocketSupport.getNativeSocketOptions(channel);
if (nativeOptions != null) {
b.setTcpInfo(nativeOptions.tcpInfo); // may be null
for (Entry<String, String> entry : nativeOptions.otherInfo.entrySet()) {
b.addOption(entry.getKey(), entry.getValue());
}
}
return b.build();
}
/**
* Returns the configuration of this channel.
*/
public ChannelConfig config() {
return channel.config();
}
protected void compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(ChannelConfig config,
int soSndBuf) {
}