类io.netty.util.concurrent.DefaultEventExecutorGroup源码实例Demo

下面列出了怎么用io.netty.util.concurrent.DefaultEventExecutorGroup的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: netty-4.1.22   文件: DefaultChannelPipelineTest.java
@Test(timeout = 3000)
public void testHandlerAddedExceptionFromChildHandlerIsPropagated() {
    final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
    try {
        final Promise<Void> promise = group1.next().newPromise();
        final AtomicBoolean handlerAdded = new AtomicBoolean();
        final Exception exception = new RuntimeException();
        ChannelPipeline pipeline = new LocalChannel().pipeline();
        pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
        pipeline.addFirst(new ChannelHandlerAdapter() {
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                handlerAdded.set(true);
                throw exception;
            }
        });
        assertFalse(handlerAdded.get());
        group.register(pipeline.channel());
        promise.syncUninterruptibly();
    } finally {
        group1.shutdownGracefully();
    }
}
 
源代码2 项目: netty-4.1.22   文件: DefaultChannelPipelineTest.java
@Test(timeout = 3000)
public void testHandlerRemovedExceptionFromChildHandlerIsPropagated() {
    final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
    try {
        final Promise<Void> promise = group1.next().newPromise();
        String handlerName = "foo";
        final Exception exception = new RuntimeException();
        ChannelPipeline pipeline = new LocalChannel().pipeline();
        pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                throw exception;
            }
        });
        pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
        group.register(pipeline.channel()).syncUninterruptibly();
        pipeline.remove(handlerName);
        promise.syncUninterruptibly();
    } finally {
        group1.shutdownGracefully();
    }
}
 
源代码3 项目: netty-4.1.22   文件: DefaultChannelPipelineTest.java
@Test
public void testPinExecutor() {
    EventExecutorGroup group = new DefaultEventExecutorGroup(2);
    ChannelPipeline pipeline = new LocalChannel().pipeline();
    ChannelPipeline pipeline2 = new LocalChannel().pipeline();

    pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
    pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
    pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter());

    EventExecutor executor1 = pipeline.context("h1").executor();
    EventExecutor executor2 = pipeline.context("h2").executor();
    assertNotNull(executor1);
    assertNotNull(executor2);
    assertSame(executor1, executor2);
    EventExecutor executor3 = pipeline2.context("h3").executor();
    assertNotNull(executor3);
    assertNotSame(executor3, executor2);
    group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
 
源代码4 项目: netty-4.1.22   文件: DefaultChannelPipelineTest.java
@Test
public void testNotPinExecutor() {
    EventExecutorGroup group = new DefaultEventExecutorGroup(2);
    ChannelPipeline pipeline = new LocalChannel().pipeline();
    pipeline.channel().config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false);

    pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
    pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());

    EventExecutor executor1 = pipeline.context("h1").executor();
    EventExecutor executor2 = pipeline.context("h2").executor();
    assertNotNull(executor1);
    assertNotNull(executor2);
    assertNotSame(executor1, executor2);
    group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
 
源代码5 项目: Lottor   文件: NettyClientServiceImpl.java
/**
 * 启动netty客户端
 */
@Override
public void start(TxConfig txConfig) {
    this.retryMax = txConfig.getRetryMax();
    this.retryInterval = txConfig.getRetryInterval();
    this.txConfig = txConfig;
    SerializeProtocolEnum serializeProtocol =
            SerializeProtocolEnum.acquireSerializeProtocol(txConfig.getNettySerializer());
    nettyClientHandlerInitializer.setSerializeProtocolEnum(serializeProtocol);
    servletExecutor = new DefaultEventExecutorGroup(txConfig.getNettyThreadMax());
    nettyClientHandlerInitializer.setServletExecutor(servletExecutor);
    nettyClientHandlerInitializer.setTxConfig(txConfig);
    try {
        bootstrap = new Bootstrap();
        groups(bootstrap, txConfig);
        doConnect();
    } catch (Exception e) {
        LogUtil.error(LOGGER, "Lottor client start failed for【{}】", () -> e.getLocalizedMessage());
        throw new TransactionRuntimeException(e);
    }
}
 
源代码6 项目: Lottor   文件: NettyServerServiceImpl.java
/**
 * 启动netty服务
 */
@Override
public void start() {
    SocketManager.getInstance().setMaxConnection(nettyConfig.getMaxConnection());
    servletExecutor = new DefaultEventExecutorGroup(MAX_THREADS);
    if (nettyConfig.getMaxThreads() != 0) {
        MAX_THREADS = nettyConfig.getMaxThreads();
    }
    try {
        final SerializeProtocolEnum serializeProtocolEnum =
                SerializeProtocolEnum.acquireSerializeProtocol(nettyConfig.getSerialize());

        nettyServerHandlerInitializer.setSerializeProtocolEnum(serializeProtocolEnum);
        nettyServerHandlerInitializer.setServletExecutor(servletExecutor);
        ServerBootstrap b = new ServerBootstrap();
        groups(b, MAX_THREADS << 1);
        b.bind(nettyConfig.getPort());

        LOGGER.info("netty service started on port: " + nettyConfig.getPort());
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
源代码7 项目: qmq   文件: AbstractNettyClient.java
public synchronized void start(NettyClientConfig config) {
    if (started.get()) {
        return;
    }
    initHandler();
    Bootstrap bootstrap = new Bootstrap();
    eventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(clientName + "-boss"));
    eventExecutors = new DefaultEventExecutorGroup(config.getClientWorkerThreads(), new DefaultThreadFactory(clientName + "-worker"));
    connectManager = new NettyConnectManageHandler(bootstrap, config.getConnectTimeoutMillis());
    bootstrap.group(this.eventLoopGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, config.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, config.getClientSocketRcvBufSize())
            .handler(newChannelInitializer(config, eventExecutors, connectManager));
    started.set(true);
}
 
源代码8 项目: Raincat   文件: NettyClientServiceImpl.java
@Override
public void start(final TxConfig txConfig) {
    SerializeProtocolEnum serializeProtocol =
            SerializeProtocolEnum.acquireSerializeProtocol(txConfig.getNettySerializer());
    nettyClientHandlerInitializer.setSerializeProtocolEnum(serializeProtocol);
    servletExecutor = new DefaultEventExecutorGroup(txConfig.getNettyThreadMax());
    nettyClientHandlerInitializer.setServletExecutor(servletExecutor);
    nettyClientHandlerInitializer.setTxConfig(txConfig);
    TxManagerLocator.getInstance().setTxConfig(txConfig);
    TxManagerLocator.getInstance().schedulePeriodicRefresh();
    try {
        bootstrap = new Bootstrap();
        groups(bootstrap, txConfig.getNettyThreadMax());
        doConnect();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
@Override
public void start() throws EmbeddedServletContainerException {
    ServerBootstrap b = new ServerBootstrap();
    groups(b);
    servletExecutor = new DefaultEventExecutorGroup(50);
    b.childHandler(new NettyEmbeddedServletInitializer(servletExecutor, context));

    // Don't yet need the complexity of lifecycle state, listeners etc, so tell the context it's initialised here
    context.setInitialised(true);

    ChannelFuture future = b.bind(address).awaitUninterruptibly();
    //noinspection ThrowableResultOfMethodCallIgnored
    Throwable cause = future.cause();
    if (null != cause) {
        throw new EmbeddedServletContainerException("Could not start Netty server", cause);
    }
    logger.info(context.getServerInfo() + " started on port: " + getPort());
}
 
源代码10 项目: netty-4.1.22   文件: TrafficShapingHandlerTest.java
@BeforeClass
public static void createGroup() {
    logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
                " StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
    group = new DefaultEventExecutorGroup(8);
    groupForGlobal = new DefaultEventExecutorGroup(8);
}
 
源代码11 项目: rpcx-java   文件: NettyClient.java
public NettyClient(IServiceDiscovery serviceDiscovery) {
    this.semaphoreOneway = new Semaphore(1000, true);
    this.semaphoreAsync = new Semaphore(1000, true);
    this.serviceDiscovery = serviceDiscovery;
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new NamedThreadFactory("NettyClientSelector_"));
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory("NettyClientWorkerThread_"));

    this.bootstrap = createBootstrap();

    startScanResponseTableSchedule();
    runEventListener();
}
 
源代码12 项目: qmq   文件: MetaInfoClientNettyImpl.java
@Override
protected ChannelInitializer<SocketChannel> newChannelInitializer(final NettyClientConfig config, final DefaultEventExecutorGroup eventExecutors, final NettyConnectManageHandler connectManager) {
    return new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(eventExecutors,
                    new EncodeHandler(),
                    new DecodeHandler(false),
                    new IdleStateHandler(0, 0, config.getClientChannelMaxIdleTimeSeconds()),
                    connectManager,
                    clientHandler);
        }
    };
}
 
源代码13 项目: qmq   文件: NettyClient.java
@Override
protected ChannelInitializer<SocketChannel> newChannelInitializer(final NettyClientConfig config, final DefaultEventExecutorGroup eventExecutors, final NettyConnectManageHandler connectManager) {
    return new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(eventExecutors,
                    new EncodeHandler(),
                    new DecodeHandler(config.isServer()),
                    new IdleStateHandler(0, 0, config.getClientChannelMaxIdleTimeSeconds()),
                    connectManager,
                    clientHandler);
        }
    };
}
 
源代码14 项目: timer   文件: AbstractClient.java
@Override
public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            getThreadCount(), new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Netty Default Event Client Thread.." + this.threadIndex.incrementAndGet());
        }
    });
}
 
源代码15 项目: Raincat   文件: NettyServerServiceImpl.java
@Override
public void start() throws InterruptedException {
    SocketManager.getInstance().setMaxConnection(nettyConfig.getMaxConnection());
    if (nettyConfig.getMaxThreads() != 0) {
        maxThread = nettyConfig.getMaxThreads();
    }
    servletExecutor = new DefaultEventExecutorGroup(maxThread);
    final SerializeProtocolEnum serializeProtocolEnum =
            SerializeProtocolEnum.acquireSerializeProtocol(nettyConfig.getSerialize());
    nettyServerHandlerInitializer.setSerializeProtocolEnum(serializeProtocolEnum);
    nettyServerHandlerInitializer.setServletExecutor(servletExecutor);
    ServerBootstrap b = new ServerBootstrap();
    bossGroup = createEventLoopGroup();
    if (bossGroup instanceof EpollEventLoopGroup) {
        groupsEpoll(b, maxThread);
    } else {
        groupsNio(b, maxThread);
    }
    try {
        LOGGER.info("netty service started on port: " + nettyConfig.getPort());
        ChannelFuture future = b.bind(nettyConfig.getPort()).sync();
        future.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
        servletExecutor.shutdownGracefully();
    }

}
 
@Default
default EventExecutorGroup executor() {
  Logger logger = LoggerFactory.getLogger("imap-executor");
  ThreadFactory threadFactory = new ThreadFactoryBuilder()
      .setDaemon(true)
      .setUncaughtExceptionHandler((t, e) -> logger.error("Uncaught exception on thread {}", t.getName(), e))
      .setNameFormat("imap-executor-%d")
      .build();

  int nThreads = Runtime.getRuntime().availableProcessors() * 2;
  return new DefaultEventExecutorGroup(nThreads, threadFactory);
}
 
源代码17 项目: awacs   文件: ServerEntry.java
@Override
public void init(Configuration configuration) {
    components = new Components();
    components.init(configuration);
    initHandler();
    host = configuration.getString(Configurations.CFG_BIND_HOST, Configurations.DEFAULT_BIND_HOST);
    port = configuration.getInteger(Configurations.CFG_BIND_PORT, Configurations.DEFAULT_BIND_PORT);
    int bossCore = configuration.getInteger(Configurations.CFG_BOSS_CORE, Configurations.DEFAULT_BOSS_CORE);
    int workerCore = configuration.getInteger(Configurations.CFG_WORKER_CORE, Configurations.DEFAULT_WORKER_CORE);
    boss = new NioEventLoopGroup(bossCore);
    worker = new NioEventLoopGroup(workerCore);
    businessGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors());
}
 
@BeforeClass
public static void createGroup() {
    logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
                " StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
    group = new DefaultEventExecutorGroup(8);
    groupForGlobal = new DefaultEventExecutorGroup(8);
}
 
源代码19 项目: logstash-input-beats   文件: Server.java
BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
    // Keeps a local copy of Server settings, so they can't be modified once it starts listening
    this.localMessageListener = messageListener;
    this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
    idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
    beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
}
 
源代码20 项目: jumbune   文件: JumbuneAgentDecoder.java
/**
 * Invoke jar receive handler in case of HA.
 *
 * @param ctx the ctx
 */
private void invokeJarReceiveHandlerForHA(ChannelHandlerContext ctx) {
	ChannelPipeline p = ctx.pipeline();
	EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
	p.addLast("stringDecoder", new StringDecoder());
	p.addLast("delegator", new Delegator(receiveDirectory));
	p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(), 
			JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
	p.addLast(e1, new ArchiveEncoder());
	p.remove(this);
}
 
源代码21 项目: jumbune   文件: JumbuneAgentDecoder.java
/**
 * Invoke jar send handler.
 *
 * @param ctx the ctx
 */
private void invokeJarSendHandlerForHA(ChannelHandlerContext ctx) {
	ChannelPipeline p = ctx.pipeline();
	EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
	p.addLast(e1, new ArchiveDecoder(10485760, receiveDirectory));
	p.addLast(ACK_RESPONSER, new AckResponser());
	p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(), 
			JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
	p.addLast(ENCODER, new StringEncoder());
	p.remove(this);
}
 
源代码22 项目: jumbune   文件: JumbuneAgentDecoder.java
/**
 * Invoke log files receive handler.
 *
 * @param ctx the ctx
 */
private void invokeLogFilesReceiveHandlerForHA(ChannelHandlerContext ctx) {
	ChannelPipeline p = ctx.pipeline();		
	EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
	p.addLast("stringDecoder", new StringDecoder());
	p.addLast("delegator", new Delegator(receiveDirectory));
	p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(), 
			JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
	p.addLast("stringEncoder", new StringEncoder());
	p.addLast(e1, new LogFilesEncoder());
	p.remove(this);
}
 
源代码23 项目: jumbune   文件: JumbuneAgentDecoder.java
/**
 * Invoke fire and get object response command handler for HA
 *
 * @param ctx the ctx
 */
private void invokeFireAndGetObjectResponseCommandHandlerForHA(
		ChannelHandlerContext ctx) {
	EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
	ChannelPipeline p = ctx.pipeline();
	p.addLast(DECODER, new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
	p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(), 
			JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
	p.addLast(e1, new CommandAsObjectResponserHA());
	p.addLast(ENCODER, new ObjectEncoder());
	p.remove(this);
}
 
源代码24 项目: jumbune   文件: JumbuneAgentDecoder.java
/**
 * Invoke fire and forget command handler for HA
 *
 * @param ctx the ctx
 */
private void invokeFireAndForgetCommandHandlerForHA(ChannelHandlerContext ctx) {
	EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
	ChannelPipeline p = ctx.pipeline();
	p.addLast(DECODER, new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
	p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(), 
			JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
	p.addLast(e1, new CommandDelegator());
	p.addLast(ENCODER, new ObjectEncoder());
	p.remove(this);
}
 
源代码25 项目: jumbune   文件: JumbuneAgentDecoder.java
/**
 * Invoke log files send handler.
 *
 * @param ctx the ctx
 */
private void invokeLogFilesSendHandlerForHA(ChannelHandlerContext ctx) {
	ChannelPipeline p = ctx.pipeline();
	EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
	p.addLast(e1, new LogFilesDecoder(receiveDirectory));
	p.addLast(ACK_RESPONSER, new AckResponser());
	p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(), 
			JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
	p.addLast(ENCODER, new StringEncoder());
	p.remove(this);
}
 
源代码26 项目: light-task-scheduler   文件: NettyRemotingClient.java
@Override
protected void clientStart() throws RemotingException {

    NettyLogger.setNettyLoggerFactory();

    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            remotingClientConfig.getClientWorkerThreads(),
            new NamedThreadFactory("NettyClientWorkerThread_")
    );

    final NettyCodecFactory nettyCodecFactory = new NettyCodecFactory(appContext, getCodec());

    this.bootstrap.group(this.eventLoopGroup).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(
                    defaultEventExecutorGroup,
                    nettyCodecFactory.getEncoder(),
                    nettyCodecFactory.getDecoder(),
                    new IdleStateHandler(remotingClientConfig.getReaderIdleTimeSeconds(), remotingClientConfig.getWriterIdleTimeSeconds(), remotingClientConfig.getClientChannelMaxIdleTimeSeconds()),//
                    new NettyConnectManageHandler(),
                    new NettyClientHandler());
        }
    });

}
 
源代码27 项目: light-task-scheduler   文件: NettyRemotingServer.java
@Override
protected void serverStart() throws RemotingException {

    NettyLogger.setNettyLoggerFactory();

    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            remotingServerConfig.getServerWorkerThreads(),
            new NamedThreadFactory("NettyServerWorkerThread_")
    );

    final NettyCodecFactory nettyCodecFactory = new NettyCodecFactory(appContext, getCodec());

    this.serverBootstrap.group(this.bossSelectorGroup, this.workerSelectorGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 65536)
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .localAddress(new InetSocketAddress(this.remotingServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            defaultEventExecutorGroup,
                            nettyCodecFactory.getEncoder(),
                            nettyCodecFactory.getDecoder(),
                            new IdleStateHandler(remotingServerConfig.getReaderIdleTimeSeconds(),
                                    remotingServerConfig.getWriterIdleTimeSeconds(), remotingServerConfig.getServerChannelMaxIdleTimeSeconds()),//
                            new NettyConnectManageHandler(), //
                            new NettyServerHandler());
                }
            });

    try {
        this.serverBootstrap.bind().sync();
    } catch (InterruptedException e) {
        throw new RemotingException("Start Netty server bootstrap error", e);
    }
}
 
源代码28 项目: cxf   文件: NettyHttpServletPipelineFactory.java
/**
 * @deprecated use {@link #NettyHttpServletPipelineFactory(TLSServerParameters, boolean, int, Map,
 * NettyHttpServerEngine, EventExecutorGroup)}
 */
@Deprecated
public NettyHttpServletPipelineFactory(TLSServerParameters tlsServerParameters,
                                       boolean supportSession, int threadPoolSize, int maxChunkContentSize,
                                       Map<String, NettyHttpContextHandler> handlerMap,
                                       NettyHttpServerEngine engine) {
    this(tlsServerParameters, supportSession, maxChunkContentSize, handlerMap, engine,
            new DefaultEventExecutorGroup(threadPoolSize));
}
 
源代码29 项目: dts   文件: NettyRemotingClient.java
@Override
public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
        nettyClientConfig.getClientWorkerThreads(), //
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
        //
        .option(ChannelOption.TCP_NODELAY, true)
        //
        .option(ChannelOption.SO_KEEPALIVE, false)
        //
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        //
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        //
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(//
                    defaultEventExecutorGroup, //
                    new NettyEncoder(), //
                    new NettyDecoder(), //
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //
                    new NettyConnetManageHandler(), //
                    new NettyClientHandler());
            }
        });

    // 每隔1秒扫描下异步调用超时情况
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Exception e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    if (this.channelEventListener != null) {
        this.nettyEventExecuter.start();
    }
}
 
源代码30 项目: dts   文件: NettyRemotingServer.java
@Override
public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
        nettyServerConfig.getServerWorkerThreads(), //
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    ServerBootstrap childHandler = //
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(NioServerSocketChannel.class)
            //
            .option(ChannelOption.SO_BACKLOG, 1024)
            //
            .option(ChannelOption.SO_REUSEADDR, true)
            //
            .option(ChannelOption.SO_KEEPALIVE, false)
            //
            .childOption(ChannelOption.TCP_NODELAY, true)
            //
            .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            //
            .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            //
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                        //
                        defaultEventExecutorGroup, //
                        new NettyEncoder(), //
                        new NettyDecoder(), //
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //
                        new NettyConnetManageHandler(), //
                        new NettyServerHandler());
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        // 这个选项有可能会占用大量堆外内存,暂时不使用。
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    if (this.channelEventListener != null) {
        this.nettyEventExecuter.start();
    }

    // 每隔1秒扫描下异步调用超时情况
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Exception e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
    log.info("Started Netty Remote Server on " + new Date() + ", The port is:" + this.port);
}
 
 类所在包
 类方法
 同包方法