下面列出了怎么用io.netty.util.concurrent.DefaultEventExecutorGroup的API类实例代码及写法,或者点击链接到github查看源代码。
@Test(timeout = 3000)
public void testHandlerAddedExceptionFromChildHandlerIsPropagated() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
final AtomicBoolean handlerAdded = new AtomicBoolean();
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handlerAdded.set(true);
throw exception;
}
});
assertFalse(handlerAdded.get());
group.register(pipeline.channel());
promise.syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Test(timeout = 3000)
public void testHandlerRemovedExceptionFromChildHandlerIsPropagated() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
String handlerName = "foo";
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
throw exception;
}
});
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
group.register(pipeline.channel()).syncUninterruptibly();
pipeline.remove(handlerName);
promise.syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Test
public void testPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = new LocalChannel().pipeline();
ChannelPipeline pipeline2 = new LocalChannel().pipeline();
pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
assertNotNull(executor1);
assertNotNull(executor2);
assertSame(executor1, executor2);
EventExecutor executor3 = pipeline2.context("h3").executor();
assertNotNull(executor3);
assertNotSame(executor3, executor2);
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@Test
public void testNotPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.channel().config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false);
pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
assertNotNull(executor1);
assertNotNull(executor2);
assertNotSame(executor1, executor2);
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
/**
* 启动netty客户端
*/
@Override
public void start(TxConfig txConfig) {
this.retryMax = txConfig.getRetryMax();
this.retryInterval = txConfig.getRetryInterval();
this.txConfig = txConfig;
SerializeProtocolEnum serializeProtocol =
SerializeProtocolEnum.acquireSerializeProtocol(txConfig.getNettySerializer());
nettyClientHandlerInitializer.setSerializeProtocolEnum(serializeProtocol);
servletExecutor = new DefaultEventExecutorGroup(txConfig.getNettyThreadMax());
nettyClientHandlerInitializer.setServletExecutor(servletExecutor);
nettyClientHandlerInitializer.setTxConfig(txConfig);
try {
bootstrap = new Bootstrap();
groups(bootstrap, txConfig);
doConnect();
} catch (Exception e) {
LogUtil.error(LOGGER, "Lottor client start failed for【{}】", () -> e.getLocalizedMessage());
throw new TransactionRuntimeException(e);
}
}
/**
* 启动netty服务
*/
@Override
public void start() {
SocketManager.getInstance().setMaxConnection(nettyConfig.getMaxConnection());
servletExecutor = new DefaultEventExecutorGroup(MAX_THREADS);
if (nettyConfig.getMaxThreads() != 0) {
MAX_THREADS = nettyConfig.getMaxThreads();
}
try {
final SerializeProtocolEnum serializeProtocolEnum =
SerializeProtocolEnum.acquireSerializeProtocol(nettyConfig.getSerialize());
nettyServerHandlerInitializer.setSerializeProtocolEnum(serializeProtocolEnum);
nettyServerHandlerInitializer.setServletExecutor(servletExecutor);
ServerBootstrap b = new ServerBootstrap();
groups(b, MAX_THREADS << 1);
b.bind(nettyConfig.getPort());
LOGGER.info("netty service started on port: " + nettyConfig.getPort());
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized void start(NettyClientConfig config) {
if (started.get()) {
return;
}
initHandler();
Bootstrap bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(clientName + "-boss"));
eventExecutors = new DefaultEventExecutorGroup(config.getClientWorkerThreads(), new DefaultThreadFactory(clientName + "-worker"));
connectManager = new NettyConnectManageHandler(bootstrap, config.getConnectTimeoutMillis());
bootstrap.group(this.eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, config.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, config.getClientSocketRcvBufSize())
.handler(newChannelInitializer(config, eventExecutors, connectManager));
started.set(true);
}
@Override
public void start(final TxConfig txConfig) {
SerializeProtocolEnum serializeProtocol =
SerializeProtocolEnum.acquireSerializeProtocol(txConfig.getNettySerializer());
nettyClientHandlerInitializer.setSerializeProtocolEnum(serializeProtocol);
servletExecutor = new DefaultEventExecutorGroup(txConfig.getNettyThreadMax());
nettyClientHandlerInitializer.setServletExecutor(servletExecutor);
nettyClientHandlerInitializer.setTxConfig(txConfig);
TxManagerLocator.getInstance().setTxConfig(txConfig);
TxManagerLocator.getInstance().schedulePeriodicRefresh();
try {
bootstrap = new Bootstrap();
groups(bootstrap, txConfig.getNettyThreadMax());
doConnect();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void start() throws EmbeddedServletContainerException {
ServerBootstrap b = new ServerBootstrap();
groups(b);
servletExecutor = new DefaultEventExecutorGroup(50);
b.childHandler(new NettyEmbeddedServletInitializer(servletExecutor, context));
// Don't yet need the complexity of lifecycle state, listeners etc, so tell the context it's initialised here
context.setInitialised(true);
ChannelFuture future = b.bind(address).awaitUninterruptibly();
//noinspection ThrowableResultOfMethodCallIgnored
Throwable cause = future.cause();
if (null != cause) {
throw new EmbeddedServletContainerException("Could not start Netty server", cause);
}
logger.info(context.getServerInfo() + " started on port: " + getPort());
}
@BeforeClass
public static void createGroup() {
logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
" StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
group = new DefaultEventExecutorGroup(8);
groupForGlobal = new DefaultEventExecutorGroup(8);
}
public NettyClient(IServiceDiscovery serviceDiscovery) {
this.semaphoreOneway = new Semaphore(1000, true);
this.semaphoreAsync = new Semaphore(1000, true);
this.serviceDiscovery = serviceDiscovery;
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new NamedThreadFactory("NettyClientSelector_"));
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory("NettyClientWorkerThread_"));
this.bootstrap = createBootstrap();
startScanResponseTableSchedule();
runEventListener();
}
@Override
protected ChannelInitializer<SocketChannel> newChannelInitializer(final NettyClientConfig config, final DefaultEventExecutorGroup eventExecutors, final NettyConnectManageHandler connectManager) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(eventExecutors,
new EncodeHandler(),
new DecodeHandler(false),
new IdleStateHandler(0, 0, config.getClientChannelMaxIdleTimeSeconds()),
connectManager,
clientHandler);
}
};
}
@Override
protected ChannelInitializer<SocketChannel> newChannelInitializer(final NettyClientConfig config, final DefaultEventExecutorGroup eventExecutors, final NettyConnectManageHandler connectManager) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(eventExecutors,
new EncodeHandler(),
new DecodeHandler(config.isServer()),
new IdleStateHandler(0, 0, config.getClientChannelMaxIdleTimeSeconds()),
connectManager,
clientHandler);
}
};
}
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
getThreadCount(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Netty Default Event Client Thread.." + this.threadIndex.incrementAndGet());
}
});
}
@Override
public void start() throws InterruptedException {
SocketManager.getInstance().setMaxConnection(nettyConfig.getMaxConnection());
if (nettyConfig.getMaxThreads() != 0) {
maxThread = nettyConfig.getMaxThreads();
}
servletExecutor = new DefaultEventExecutorGroup(maxThread);
final SerializeProtocolEnum serializeProtocolEnum =
SerializeProtocolEnum.acquireSerializeProtocol(nettyConfig.getSerialize());
nettyServerHandlerInitializer.setSerializeProtocolEnum(serializeProtocolEnum);
nettyServerHandlerInitializer.setServletExecutor(servletExecutor);
ServerBootstrap b = new ServerBootstrap();
bossGroup = createEventLoopGroup();
if (bossGroup instanceof EpollEventLoopGroup) {
groupsEpoll(b, maxThread);
} else {
groupsNio(b, maxThread);
}
try {
LOGGER.info("netty service started on port: " + nettyConfig.getPort());
ChannelFuture future = b.bind(nettyConfig.getPort()).sync();
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
servletExecutor.shutdownGracefully();
}
}
@Default
default EventExecutorGroup executor() {
Logger logger = LoggerFactory.getLogger("imap-executor");
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setUncaughtExceptionHandler((t, e) -> logger.error("Uncaught exception on thread {}", t.getName(), e))
.setNameFormat("imap-executor-%d")
.build();
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
return new DefaultEventExecutorGroup(nThreads, threadFactory);
}
@Override
public void init(Configuration configuration) {
components = new Components();
components.init(configuration);
initHandler();
host = configuration.getString(Configurations.CFG_BIND_HOST, Configurations.DEFAULT_BIND_HOST);
port = configuration.getInteger(Configurations.CFG_BIND_PORT, Configurations.DEFAULT_BIND_PORT);
int bossCore = configuration.getInteger(Configurations.CFG_BOSS_CORE, Configurations.DEFAULT_BOSS_CORE);
int workerCore = configuration.getInteger(Configurations.CFG_WORKER_CORE, Configurations.DEFAULT_WORKER_CORE);
boss = new NioEventLoopGroup(bossCore);
worker = new NioEventLoopGroup(workerCore);
businessGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors());
}
@BeforeClass
public static void createGroup() {
logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
" StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
group = new DefaultEventExecutorGroup(8);
groupForGlobal = new DefaultEventExecutorGroup(8);
}
BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
// Keeps a local copy of Server settings, so they can't be modified once it starts listening
this.localMessageListener = messageListener;
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
}
/**
* Invoke jar receive handler in case of HA.
*
* @param ctx the ctx
*/
private void invokeJarReceiveHandlerForHA(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
p.addLast("stringDecoder", new StringDecoder());
p.addLast("delegator", new Delegator(receiveDirectory));
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast(e1, new ArchiveEncoder());
p.remove(this);
}
/**
* Invoke jar send handler.
*
* @param ctx the ctx
*/
private void invokeJarSendHandlerForHA(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
p.addLast(e1, new ArchiveDecoder(10485760, receiveDirectory));
p.addLast(ACK_RESPONSER, new AckResponser());
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast(ENCODER, new StringEncoder());
p.remove(this);
}
/**
* Invoke log files receive handler.
*
* @param ctx the ctx
*/
private void invokeLogFilesReceiveHandlerForHA(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
p.addLast("stringDecoder", new StringDecoder());
p.addLast("delegator", new Delegator(receiveDirectory));
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast("stringEncoder", new StringEncoder());
p.addLast(e1, new LogFilesEncoder());
p.remove(this);
}
/**
* Invoke fire and get object response command handler for HA
*
* @param ctx the ctx
*/
private void invokeFireAndGetObjectResponseCommandHandlerForHA(
ChannelHandlerContext ctx) {
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
ChannelPipeline p = ctx.pipeline();
p.addLast(DECODER, new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast(e1, new CommandAsObjectResponserHA());
p.addLast(ENCODER, new ObjectEncoder());
p.remove(this);
}
/**
* Invoke fire and forget command handler for HA
*
* @param ctx the ctx
*/
private void invokeFireAndForgetCommandHandlerForHA(ChannelHandlerContext ctx) {
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
ChannelPipeline p = ctx.pipeline();
p.addLast(DECODER, new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast(e1, new CommandDelegator());
p.addLast(ENCODER, new ObjectEncoder());
p.remove(this);
}
/**
* Invoke log files send handler.
*
* @param ctx the ctx
*/
private void invokeLogFilesSendHandlerForHA(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
p.addLast(e1, new LogFilesDecoder(receiveDirectory));
p.addLast(ACK_RESPONSER, new AckResponser());
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast(ENCODER, new StringEncoder());
p.remove(this);
}
@Override
protected void clientStart() throws RemotingException {
NettyLogger.setNettyLoggerFactory();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
remotingClientConfig.getClientWorkerThreads(),
new NamedThreadFactory("NettyClientWorkerThread_")
);
final NettyCodecFactory nettyCodecFactory = new NettyCodecFactory(appContext, getCodec());
this.bootstrap.group(this.eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
nettyCodecFactory.getEncoder(),
nettyCodecFactory.getDecoder(),
new IdleStateHandler(remotingClientConfig.getReaderIdleTimeSeconds(), remotingClientConfig.getWriterIdleTimeSeconds(), remotingClientConfig.getClientChannelMaxIdleTimeSeconds()),//
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
}
@Override
protected void serverStart() throws RemotingException {
NettyLogger.setNettyLoggerFactory();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
remotingServerConfig.getServerWorkerThreads(),
new NamedThreadFactory("NettyServerWorkerThread_")
);
final NettyCodecFactory nettyCodecFactory = new NettyCodecFactory(appContext, getCodec());
this.serverBootstrap.group(this.bossSelectorGroup, this.workerSelectorGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 65536)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(this.remotingServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
nettyCodecFactory.getEncoder(),
nettyCodecFactory.getDecoder(),
new IdleStateHandler(remotingServerConfig.getReaderIdleTimeSeconds(),
remotingServerConfig.getWriterIdleTimeSeconds(), remotingServerConfig.getServerChannelMaxIdleTimeSeconds()),//
new NettyConnectManageHandler(), //
new NettyServerHandler());
}
});
try {
this.serverBootstrap.bind().sync();
} catch (InterruptedException e) {
throw new RemotingException("Start Netty server bootstrap error", e);
}
}
/**
* @deprecated use {@link #NettyHttpServletPipelineFactory(TLSServerParameters, boolean, int, Map,
* NettyHttpServerEngine, EventExecutorGroup)}
*/
@Deprecated
public NettyHttpServletPipelineFactory(TLSServerParameters tlsServerParameters,
boolean supportSession, int threadPoolSize, int maxChunkContentSize,
Map<String, NettyHttpContextHandler> handlerMap,
NettyHttpServerEngine engine) {
this(tlsServerParameters, supportSession, maxChunkContentSize, handlerMap, engine,
new DefaultEventExecutorGroup(threadPoolSize));
}
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyClientConfig.getClientWorkerThreads(), //
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
//
.option(ChannelOption.TCP_NODELAY, true)
//
.option(ChannelOption.SO_KEEPALIVE, false)
//
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
//
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
//
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(//
defaultEventExecutorGroup, //
new NettyEncoder(), //
new NettyDecoder(), //
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //
new NettyConnetManageHandler(), //
new NettyClientHandler());
}
});
// 每隔1秒扫描下异步调用超时情况
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
}
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyServerConfig.getServerWorkerThreads(), //
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler = //
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NioServerSocketChannel.class)
//
.option(ChannelOption.SO_BACKLOG, 1024)
//
.option(ChannelOption.SO_REUSEADDR, true)
//
.option(ChannelOption.SO_KEEPALIVE, false)
//
.childOption(ChannelOption.TCP_NODELAY, true)
//
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
//
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//
defaultEventExecutorGroup, //
new NettyEncoder(), //
new NettyDecoder(), //
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //
new NettyConnetManageHandler(), //
new NettyServerHandler());
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
// 这个选项有可能会占用大量堆外内存,暂时不使用。
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
// 每隔1秒扫描下异步调用超时情况
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
log.info("Started Netty Remote Server on " + new Date() + ", The port is:" + this.port);
}