下面列出了io.netty.channel.ChannelId#io.netty.channel.ServerChannel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Server(
AddressResolver addressResolver,
EventLoopGroup eventLoopGroup,
Class<? extends ServerChannel> channelClass,
boolean customEventLoop,
Timer timer,
boolean customTimer,
long bindTimeoutInNanos,
StubStore stubStore,
boolean activityLogging) {
this(
addressResolver,
eventLoopGroup,
customEventLoop,
timer,
customTimer,
bindTimeoutInNanos,
stubStore,
activityLogging,
new ServerBootstrap()
.group(eventLoopGroup)
.channel(channelClass)
.childHandler(new Initializer()));
}
/**
* Start proxy server
* */
public void start()
throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(_acceptorGroup, _upstreamWorkerGroup);
serverBootstrap.channelFactory(new ChannelFactory<ServerChannel>() {
@Override
public ServerChannel newChannel() {
return new NioServerSocketChannel();
}
});
serverBootstrap.childHandler(new ProxyInitializer(this));
//bind
ChannelFuture future = serverBootstrap.bind(_host, _port);
//wait for the future
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.channel().closeFuture().awaitUninterruptibly();
throw new ChannelException(String.format("Failed to bind to: %s:%d", _host, _port), future.cause());
} else {
_allChannels.add(future.channel());
}
}
@Override
public boolean remove(Object o) {
if (!(o instanceof Channel)) {
return false;
}
boolean removed;
Channel c = (Channel) o;
if (c instanceof ServerChannel) {
removed = serverChannels.remove(c);
} else {
removed = nonServerChannels.remove(c);
}
if (!removed) {
return false;
}
c.closeFuture().removeListener(remover);
return true;
}
private void buildServerAndRun(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
Class<? extends ServerChannel> channelClass) throws Exception {
var b = new ServerBootstrap();
try {
b.group(bossGroup, workerGroup)
.channel(channelClass)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(getChannelInitializer());
var listenTo = (listenAddress == null || listenAddress.isEmpty())
? new InetSocketAddress(port)
: new InetSocketAddress(listenAddress, port);
this.cf = b.bind(listenTo).sync();
} catch (Exception e) {
log.error("Error initializing {}, port {}", getServerName(), port, e);
throw e;
}
log.info("{} server listening at {} port.", getServerName(), port);
}
private static ServerChannel createServer(
Class<? extends ServerChannel> serverChannelClass,
IntFunction<EventLoopGroup> newEventLoopGroup,
SocketAddress socketAddress,
ChannelHandler handler) {
EventLoopGroup eventLoop = newEventLoopGroup.apply(1);
ServerBootstrap sb =
new ServerBootstrap()
.group(eventLoop)
.channel(serverChannelClass)
.childHandler(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1000));
ch.pipeline().addLast(handler);
}
});
try {
return ((ServerChannel) sb.bind(socketAddress).sync().channel());
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
public UnixDomainServer(
Class<? extends ServerChannel> serverChannelClass,
IntFunction<EventLoopGroup> newEventLoopGroup) {
EventLoopGroup eventLoop = newEventLoopGroup.apply(1);
ServerBootstrap sb =
new ServerBootstrap()
.group(eventLoop)
.channel(serverChannelClass)
.childHandler(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1000));
ch.pipeline().addLast(Preconditions.checkNotNull(handler));
}
});
try {
ServerChannel actual = ((ServerChannel) sb.bind(newDomainSocketAddress()).sync().channel());
this.serverChannel = mock(ServerChannel.class, AdditionalAnswers.delegatesTo(actual));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Test(expected = UploadTimeoutException.class, timeout = 30000)
public void uploadTimeout() throws Exception {
ServerChannel server = null;
try {
server =
testServer.start(
new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
// Don't respond and force a client timeout.
}
});
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
getFromFuture(blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)));
fail("Exception expected");
} finally {
testServer.stop(server);
}
}
@Test(expected = DownloadTimeoutException.class, timeout = 30000)
public void downloadTimeout() throws Exception {
ServerChannel server = null;
try {
server =
testServer.start(
new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
// Don't respond and force a client timeout.
}
});
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream()));
fail("Exception expected");
} finally {
testServer.stop(server);
}
}
private void expiredAuthTokensShouldBeRetried_get(
HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) throws Exception {
ServerChannel server = null;
try {
server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
getFromFuture(blobStore.downloadBlob(DIGEST, out));
assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents");
verify(credentials, times(1)).refresh();
verify(credentials, times(2)).getRequestMetadata(any(URI.class));
verify(credentials, times(2)).hasRequestMetadata();
// The caller is responsible to the close the stream.
verify(out, never()).close();
verifyNoMoreInteractions(credentials);
} finally {
testServer.stop(server);
}
}
private void expiredAuthTokensShouldBeRetried_put(
HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) throws Exception {
ServerChannel server = null;
try {
server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)).get();
verify(credentials, times(1)).refresh();
verify(credentials, times(2)).getRequestMetadata(any(URI.class));
verify(credentials, times(2)).hasRequestMetadata();
verifyNoMoreInteractions(credentials);
} finally {
testServer.stop(server);
}
}
private void errorCodeThatShouldNotBeRetried_get(
HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) {
ServerChannel server = null;
try {
server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream()));
fail("Exception expected.");
} catch (Exception e) {
assertThat(e).isInstanceOf(HttpException.class);
assertThat(((HttpException) e).response().status())
.isEqualTo(HttpResponseStatus.UNAUTHORIZED);
} finally {
testServer.stop(server);
}
}
private void errorCodeThatShouldNotBeRetried_put(
HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) {
ServerChannel server = null;
try {
server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] oneByte = new byte[] {0};
getFromFuture(
blobStore.uploadBlob(DIGEST_UTIL.compute(oneByte), ByteString.copyFrom(oneByte)));
fail("Exception expected.");
} catch (Exception e) {
assertThat(e).isInstanceOf(HttpException.class);
assertThat(((HttpException) e).response().status())
.isEqualTo(HttpResponseStatus.UNAUTHORIZED);
} finally {
testServer.stop(server);
}
}
private void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass) throws InterruptedException {
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.option(ChannelOption.SO_REUSEADDR, true);
b.group(loupGroup).channel(serverChannelClass).childHandler(new HelloServerInitializer(loupGroup.next()));
b.option(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE);
b.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE);
Channel ch = b.bind(port).sync().channel();
ch.closeFuture().sync();
} finally {
loupGroup.shutdownGracefully().sync();
}
}
private void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass, IoMultiplexer multiplexer) throws InterruptedException {
try {
InetSocketAddress inet = new InetSocketAddress(port);
ServerBootstrap b = new ServerBootstrap();
if (multiplexer == IoMultiplexer.EPOLL) {
b.option(EpollChannelOption.SO_REUSEPORT, true);
}
b.option(ChannelOption.SO_BACKLOG, 8192);
b.option(ChannelOption.SO_REUSEADDR, true);
b.group(loupGroup).channel(serverChannelClass).childHandler(new HelloServerInitializer(loupGroup.next()));
b.childOption(ChannelOption.SO_REUSEADDR, true);
Channel ch = b.bind(inet).sync().channel();
System.out.printf("Httpd started. Listening on: %s%n", inet.toString());
ch.closeFuture().sync();
} finally {
loupGroup.shutdownGracefully().sync();
}
}
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
Map<ChannelOption<?>, ?> channelOptions,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
InternalChannelz channelz) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.usingSharedBossGroup = bossGroup == null;
this.usingSharedWorkerGroup = workerGroup == null;
this.transportTracerFactory = transportTracerFactory;
this.maxStreamsPerConnection = maxStreamsPerConnection;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
this.channelz = Preconditions.checkNotNull(channelz);
}
public Class<? extends ServerChannel> getServerChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollServerDomainSocketChannel.class : EpollServerSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueServerDomainSocketChannel.class : KQueueServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
@Override
public boolean contains(Object o) {
if (o instanceof Channel) {
Channel c = (Channel) o;
if (o instanceof ServerChannel) {
return serverChannels.containsValue(c);
} else {
return nonServerChannels.containsValue(c);
}
} else {
return false;
}
}
@Override
public boolean add(Channel channel) {
ConcurrentMap<ChannelId, Channel> map =
channel instanceof ServerChannel? serverChannels : nonServerChannels;
boolean added = map.putIfAbsent(channel.id(), channel) == null;
if (added) {
channel.closeFuture().addListener(remover);
}
if (stayClosed && closed) {
// First add channel, than check if closed.
// Seems inefficient at first, but this way a volatile
// gives us enough synchronization to be thread-safe.
//
// If true: Close right away.
// (Might be closed a second time by ChannelGroup.close(), but this is ok)
//
// If false: Channel will definitely be closed by the ChannelGroup.
// (Because closed=true always happens-before ChannelGroup.close())
//
// See https://github.com/netty/netty/issues/4020
channel.close();
}
return added;
}
protected boolean closeOnReadError(Throwable cause) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
return cause instanceof IOException &&
!(cause instanceof PortUnreachableException) &&
!(this instanceof ServerChannel);
}
/**
* Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @param addressClass The class of the address that the server socket will be bound to.
* @return the class that should be used for bootstrapping
*/
public static Class<? extends ServerChannel> serverChannel(EventLoopGroup group,
Class<? extends SocketAddress> addressClass) {
if (useEpoll(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? EpollServerDomainSocketChannel.class :
EpollServerSocketChannel.class;
} else if (useKQueue(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? KQueueServerDomainSocketChannel.class :
KQueueServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
}
}
public Class<? extends ServerChannel> getServerChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollServerDomainSocketChannel.class : EpollServerSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueServerDomainSocketChannel.class : KQueueServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
private NettyExecutor(EventLoopGroup eventLoopGroup,
Class<? extends ServerChannel> serverEventLoopClass,
Class<? extends SocketChannel> clientEventLoopClass) {
this.serverEventLoopClass = serverEventLoopClass;
this.clientEventLoopClass = clientEventLoopClass;
this.eventLoopGroup = eventLoopGroup;
}
private static void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass,
boolean isEpoll) throws InterruptedException {
try {
InetSocketAddress inet = new InetSocketAddress(port);
ServerBootstrap b = new ServerBootstrap();
if (isEpoll) {
b.option(EpollChannelOption.SO_REUSEPORT, true);
}
b.option(ChannelOption.SO_BACKLOG, 1024 * 8);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.SO_RCVBUF, 256 * 1024);
b.group(loupGroup).channel(serverChannelClass).childHandler(new BenchmarkChannelInitializer());
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
b.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
Channel ch = b.bind(inet).sync().channel();
System.out.printf("Httpd started. Listening on: %s%n", inet.toString());
ch.closeFuture().sync();
} finally {
loupGroup.shutdownGracefully().sync();
}
}
private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
Class<? extends ServerChannel> serverChannelClass;
if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
serverChannelClass = EpollServerSocketChannel.class;
}
else {
serverChannelClass = NioServerSocketChannel.class;
}
bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
if (scheduledExecutor != null) {
bootstrap.handler(scheduledExecutor);
}
bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));
bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
// setting buffer size can improve I/O
.childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
// recommended in
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.bind(port).sync();
}
@Override
public boolean contains(Object o) {
if (o instanceof Channel) {
Channel c = (Channel) o;
if (o instanceof ServerChannel) {
return serverChannels.contains(c);
} else {
return nonServerChannels.contains(c);
}
} else {
return false;
}
}
@Override
public boolean add(Channel channel) {
ConcurrentSet<Channel> set =
channel instanceof ServerChannel? serverChannels : nonServerChannels;
boolean added = set.add(channel);
if (added) {
channel.closeFuture().addListener(remover);
}
return added;
}
ServerChannelConfiguration(
EventLoopGroup bossGroup,
EventLoopGroup workerGroup,
Class<? extends ServerChannel> channelClass) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.channelClass = channelClass;
}
public ServerChannel start(ChannelInboundHandler handler) {
return createServer(
NioServerSocketChannel.class,
NioEventLoopGroup::new,
new InetSocketAddress("localhost", 0),
handler);
}
public void stop(ServerChannel serverChannel) {
try {
serverChannel.close();
serverChannel.closeFuture().sync();
serverChannel.eventLoop().shutdownGracefully().sync();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
public void stop(ServerChannel serverChannel) {
// Note: In the tests, we expect that connecting to a closed server channel results
// in a channel connection error. Netty doesn't seem to handle closing domain socket
// addresses very well-- often connecting to a closed domain socket will result in a
// read timeout instead of a connection timeout.
//
// This is a hack to ensure connection timeouts are "received" by the tests for this
// dummy domain socket server. In particular, this lets the timeoutShouldWork_connect
// test work for both inet and domain sockets.
//
// This is also part of the workaround for https://github.com/netty/netty/issues/7047.
when(this.serverChannel.localAddress()).thenReturn(new DomainSocketAddress(""));
this.handler = null;
}