下面列出了怎么用io.netty.util.concurrent.EventExecutorGroup的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void test_ssl_factory_same() {
final Injector injector = Guice.createInjector(Stage.PRODUCTION, new SecurityModule(), new AbstractModule() {
@Override
protected void configure() {
bind(EventExecutorGroup.class).toInstance(mock(EventExecutorGroup.class));
bind(ShutdownHooks.class).toInstance(mock(ShutdownHooks.class));
bindScope(LazySingleton.class, LazySingletonScope.get());
}
});
final SslFactory instance1 = injector.getInstance(SslFactory.class);
final SslFactory instance2 = injector.getInstance(SslFactory.class);
assertSame(instance1, instance2);
}
@Test
public void test_topic_matcher_not_same() {
final Injector injector = Guice.createInjector(Stage.PRODUCTION,
new HiveMQMainModule(),
new AbstractModule() {
@Override
protected void configure() {
bind(EventExecutorGroup.class).toInstance(Mockito.mock(EventExecutorGroup.class));
}
});
final TopicMatcher instance1 = injector.getInstance(TopicMatcher.class);
final TopicMatcher instance2 = injector.getInstance(TopicMatcher.class);
assertNotSame(instance1, instance2);
}
@Override
public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
if (handlers.length == 0 || handlers[0] == null) {
return this;
}
int size;
for (size = 1; size < handlers.length; size ++) {
if (handlers[size] == null) {
break;
}
}
for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i];
addFirst(executor, null, h);
}
return this;
}
@Test(timeout = 3000)
public void testHandlerAddedExceptionFromChildHandlerIsPropagated() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
final AtomicBoolean handlerAdded = new AtomicBoolean();
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handlerAdded.set(true);
throw exception;
}
});
assertFalse(handlerAdded.get());
group.register(pipeline.channel());
promise.syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Test
public void testPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = new LocalChannel().pipeline();
ChannelPipeline pipeline2 = new LocalChannel().pipeline();
pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
assertNotNull(executor1);
assertNotNull(executor2);
assertSame(executor1, executor2);
EventExecutor executor3 = pipeline2.context("h3").executor();
assertNotNull(executor3);
assertNotSame(executor3, executor2);
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@Override
public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
if (handlers.length == 0 || handlers[0] == null) {
return this;
}
int size;
for (size = 1; size < handlers.length; size ++) {
if (handlers[size] == null) {
break;
}
}
for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i];
addFirst(executor, generateName(h), h);
}
return this;
}
public ImapClient(ImapClientConfiguration configuration,
Channel channel,
Supplier<SslContext> sslContextSupplier,
EventExecutorGroup promiseExecutor,
String clientName) {
this.logger = LogUtils.loggerWithName(ImapClient.class, clientName);
this.configuration = configuration;
this.channel = channel;
this.sslContextSupplier = sslContextSupplier;
this.promiseExecutor = promiseExecutor;
this.clientState = new ImapClientState(clientName, promiseExecutor);
this.codec = new ImapCodec(clientState);
this.pendingWriteQueue = new ConcurrentLinkedQueue<>();
this.connectionShutdown = new AtomicBoolean(false);
this.connectionClosed = new AtomicBoolean(false);
this.capabilities = new AtomicReference<>(null);
configureChannel();
}
public ReadWriteExchangeChannelGroup(MsgHandler<Protocol> msgHandler, Address address, int connectTimeout,
int reconnectInterval, byte idleTimeout, byte maxIdleTimeOut, boolean lazy, short connections,
short writeConnections, boolean reverseIndex, EventLoopGroup loopGroup, EventExecutorGroup executorGroup)
throws SailfishException {
super(UUID.randomUUID());
this.msgHandler = msgHandler;
this.tracer = new Tracer();
NegotiateConfig readConfig = new NegotiateConfig(idleTimeout, maxIdleTimeOut, id(), ChannelType.read.code(),
connections, writeConnections, (short) 0, reverseIndex);
this.readGroup = new DefaultExchangeChannelGroup(tracer, msgHandler, address,
(short) (connections - writeConnections), connectTimeout, reconnectInterval, idleTimeout,
maxIdleTimeOut, lazy, reverseIndex, readConfig, this, loopGroup, executorGroup);
NegotiateConfig writeConfig = new NegotiateConfig(idleTimeout, maxIdleTimeOut, id(), ChannelType.write.code(),
connections, writeConnections, (short) 0, reverseIndex);
this.writeGroup = new DefaultExchangeChannelGroup(tracer, msgHandler, address, writeConnections, connectTimeout,
reconnectInterval, idleTimeout, maxIdleTimeOut, lazy, reverseIndex, writeConfig, this, loopGroup,
executorGroup);
}
private ChannelInitializer<SocketChannel> newChannelInitializer(final NegotiateConfig config,
final ExchangeChannelGroup channelGroup, final EventExecutorGroup executorGroup) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ch.attr(ChannelAttrKeys.maxIdleTimeout).set(config.maxIdleTimeout());
ch.attr(ChannelAttrKeys.channelGroup).set(channelGroup);
ch.attr(ChannelAttrKeys.clientSide).set(true);
ch.attr(OneTime.awaitNegotiate).set(new CountDownLatch(1));
ch.attr(OneTime.channelConfig).set(config);
// TODO should increase ioRatio when every ChannelHandler bind to executorGroup?
pipeline.addLast(executorGroup,
RemotingEncoder.INSTANCE,
new RemotingDecoder(),
new IdleStateHandler(config.idleTimeout(), 0, 0),
HeartbeatChannelHandler.INSTANCE,
NegotiateChannelHandler.INSTANCE,
ConcreteRequestHandler.INSTANCE);
}
};
}
public OFChannelInitializer(Controller controller,
EventExecutorGroup pipelineExecutor,
SSLContext sslContext) {
super();
this.controller = controller;
this.pipelineExecutor = pipelineExecutor;
this.sslContext = sslContext;
}
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 获取handler的名字
name = filterName(name, handler);
// 创建channel handler上下文
newCtx = newContext(group, name, handler);
// 添加handler到handler上下文链表的第一个位置
addFirst0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
@Override
public final ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);
addBefore0(ctx, newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
public TcpServerPipelineFactory(final int readerIdleTimeSeconds, final LineBasedFrameDecoderFactory framerFactory, final StringDecoder decoder,
final MetricBatcherFactory metricBatcherFactory, final MetricPublishHandler publishHandler, final EventExecutorGroup publishExecutor) {
Assert.notNull(framerFactory, "framerFactory, may not be null");
Assert.notNull(decoder, "decoder may not be null");
Assert.notNull(metricBatcherFactory, "metricBatcherFactory may not be null");
Assert.notNull(publishHandler, "publishHandler may not be null");
Assert.notNull(publishExecutor, "publishExecutor may not be null");
this.readerIdleTimeSeconds = readerIdleTimeSeconds;
this.framerFactory = framerFactory;
this.decoder = decoder;
this.metricBatcherFactory = metricBatcherFactory;
this.publishHandler = publishHandler;
this.publishExecutor = publishExecutor;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public Promise<Boolean> release(EventExecutorGroup eventLoopGroup, long quietPeriod, long timeout, TimeUnit unit) {
DefaultPromise<Boolean> result = new DefaultPromise<Boolean>(ImmediateEventExecutor.INSTANCE);
result.setSuccess(true);
return result;
}
public ProtocolDetectHandler(ChannelGroup channelGroup, final Supplier<TelnetHandler> handlerFactory,
Consumer<TtyConnection> ttyConnectionFactory, EventExecutorGroup workerGroup) {
this.channelGroup = channelGroup;
this.handlerFactory = handlerFactory;
this.ttyConnectionFactory = ttyConnectionFactory;
this.workerGroup = workerGroup;
}
public NettyEmbeddedServletInitializer(EventExecutorGroup servletExecutor, NettyEmbeddedContext servletContext, SslContext sslContext) {
this.servletContext = servletContext;
this.servletExecutor = checkNotNull(servletExecutor);
this.sslContext=sslContext;
requestDispatcherHandler = new RequestDispatcherHandler(servletContext);
}
<T extends ServerSocketChannel> RpcServer(EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutor, Class<T> channel, SocketAddress address) {
this.address = address;
this.allChannels = new DefaultChannelGroup(eventLoopGroup.next());
this.handler = new ServerHandler(allChannels);
this.bootstrap = new ServerBootstrap();
bootstrap.channel(channel);
bootstrap.childHandler(new ServerInitializer(eventExecutor, handler));
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
}
public ResponseDecoder(ImapClientConfiguration configuration,
ImapClientState clientState,
EventExecutorGroup executorGroup) {
super(State.SKIP_CONTROL_CHARS);
this.logger = LogUtils.loggerWithName(ResponseDecoder.class, clientState.getClientName());
this.clientState = clientState;
this.executorGroup = executorGroup;
this.charSeq = new SoftReferencedAppendableCharSequence(configuration.defaultResponseBufferSize());
this.lineParser = new LineParser(charSeq, configuration.maxLineLength());
this.wordParser = new WordParser(charSeq, configuration.maxLineLength());
this.fetchResponseTypeParser = new FetchResponseTypeParser(charSeq, configuration.maxLineLength());
this.atomOrStringParser = new AtomOrStringParser(charSeq, configuration.maxLineLength());
this.literalStringParser = new LiteralStringParser(charSeq, configuration.maxLineLength());
this.bufferedBodyParser = new BufferedBodyParser(charSeq);
this.numberParser = new NumberParser(charSeq, 19);
this.envelopeParser = new EnvelopeParser();
this.nestedArrayParserRecycler = new NestedArrayParser.Recycler<>(literalStringParser);
this.messageBuilder = ((DefaultMessageBuilder) MESSAGE_SERVICE_FACTORY.newMessageBuilder());
MimeConfig mimeConfig = MimeConfig.custom()
.setMaxLineLen(configuration.maxLineLength())
.setMaxHeaderLen(configuration.maxLineLength())
.setMaxHeaderCount(configuration.maxHeaderCount())
.build();
messageBuilder.setMimeEntityConfig(mimeConfig);
this.untaggedResponses = new ArrayList<>();
this.responseBuilder = new TaggedResponse.Builder();
this.allBytesParser = configuration.tracingEnabled() ? new AllBytesParser(charSeq) : null;
}
@Default
default EventExecutorGroup executor() {
Logger logger = LoggerFactory.getLogger("imap-executor");
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setUncaughtExceptionHandler((t, e) -> logger.error("Uncaught exception on thread {}", t.getName(), e))
.setNameFormat("imap-executor-%d")
.build();
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
return new DefaultEventExecutorGroup(nThreads, threadFactory);
}
public ImapClientState(String clientName, EventExecutorGroup executorGroup) {
this.clientName = clientName;
this.executorGroup = executorGroup;
this.currentCommand = new AtomicReference<>();
this.commandCount = new AtomicLong(0);
this.messageNumber = new AtomicLong(0);
this.messageAddListeners = new CopyOnWriteArrayList<>();
this.openEventListeners = new CopyOnWriteArrayList<>();
this.connectionListeners = new CopyOnWriteArrayList<>();
this.handlers = new CopyOnWriteArrayList<>();
}
protected MultiConnectionsExchangeChannelGroup(Tracer tracer, MsgHandler<Protocol> msgHandler, Address address,
short connections, int connectTimeout, int reconnectInterval, byte idleTimeout, byte maxIdleTimeOut,
boolean lazy, boolean reverseIndex, NegotiateConfig config, ExchangeChannelGroup parentGroup,
EventLoopGroup loopGroup, EventExecutorGroup executorGroup) throws SailfishException {
this.tracer = tracer;
this.msgHandler = msgHandler;
children = new ExchangeChannel[connections];
deadChildren = new ExchangeChannel[connections];
if (null == config) {
config = new NegotiateConfig(idleTimeout, maxIdleTimeOut, id(), ChannelType.readwrite.code(),
(short) connections, (short) connections, (short) 0, reverseIndex);
}
Bootstrap bootstrap = null;
for (short i = 0; i < connections; i++) {
boolean success = false;
final NegotiateConfig deepCopy = config.deepCopy().index(i);
parentGroup = (null == parentGroup ? this : parentGroup);
bootstrap = configureBoostrap(address, connectTimeout, deepCopy, parentGroup, loopGroup, executorGroup);
try {
children[i] = newChild(parentGroup, bootstrap, reconnectInterval, lazy, deepCopy.isRead());
success = true;
} catch (SailfishException cause) {
throw cause;
} finally {
if (!success) {
close(Integer.MAX_VALUE);
}
}
}
chooser = DefaultExchangeChannelChooserFactory.INSTANCE.newChooser(children, deadChildren);
}
public NettyHttpServletPipelineFactory(TLSServerParameters tlsServerParameters,
boolean supportSession, int maxChunkContentSize,
Map<String, NettyHttpContextHandler> handlerMap,
NettyHttpServerEngine engine, EventExecutorGroup applicationExecutor) {
this.supportSession = supportSession;
this.watchdog = new HttpSessionWatchdog();
this.handlerMap = handlerMap;
this.tlsServerParameters = tlsServerParameters;
this.maxChunkContentSize = maxChunkContentSize;
this.nettyHttpServerEngine = engine;
this.applicationExecutor = applicationExecutor;
}
public DefaultExchangeChannelGroup(Tracer tracer, MsgHandler<Protocol> msgHandler, Address address,
short connections, int connectTimeout, int reconnectInterval, byte idleTimeout, byte maxIdleTimeOut,
boolean lazy, boolean reverseIndex, NegotiateConfig config, ExchangeChannelGroup parentGroup,
EventLoopGroup loopGroup, EventExecutorGroup executorGroup) throws SailfishException {
super(tracer, msgHandler, address, connections, connectTimeout, reconnectInterval, idleTimeout, maxIdleTimeOut,
lazy, reverseIndex, config, parentGroup, loopGroup, executorGroup);
}
protected Bootstrap configureBoostrap(Address remoteAddress, int connectTimeout, NegotiateConfig config,
ExchangeChannelGroup channelGroup, EventLoopGroup loopGroup, EventExecutorGroup executorGroup) {
Bootstrap boot = newBootstrap();
if (null == loopGroup) {
loopGroup = ClientEventGroup.INSTANCE.getLoopGroup();
}
if (null == executorGroup) {
executorGroup = ClientEventGroup.INSTANCE.getExecutorGroup();
}
boot.group(loopGroup);
boot.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
boot.remoteAddress(remoteAddress.host(), remoteAddress.port());
boot.handler(newChannelInitializer(config, channelGroup, executorGroup));
return boot;
}
public void start() throws SailfishException {
ServerBootstrap boot = newServerBootstrap();
EventLoopGroup accept = NettyPlatformIndependent.newEventLoopGroup(1,
new DefaultThreadFactory(RemotingConstants.SERVER_ACCEPT_THREADNAME));
if (null != config.getEventLoopGroup()) {
boot.group(accept, config.getEventLoopGroup());
} else {
boot.group(accept, ServerEventGroup.INSTANCE.getLoopGroup());
}
final EventExecutorGroup executor = (null != config.getEventExecutorGroup() ? config.getEventExecutorGroup()
: ServerEventGroup.INSTANCE.getExecutorGroup());
boot.localAddress(config.address().host(), config.address().port());
boot.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ch.attr(ChannelAttrKeys.OneTime.idleTimeout).set(config.idleTimeout());
ch.attr(ChannelAttrKeys.maxIdleTimeout).set(config.maxIdleTimeout());
ch.attr(ChannelAttrKeys.exchangeServer).set(DefaultServer.this);
pipeline.addLast(executor,
RemotingEncoder.INSTANCE,
new RemotingDecoder(),
new IdleStateHandler(config.idleTimeout(), 0, 0),
HeartbeatChannelHandler.INSTANCE,
NegotiateChannelHandler.INSTANCE,
ConcreteRequestHandler.INSTANCE);
}
});
try {
channel = boot.bind().syncUninterruptibly().channel();
} catch (Throwable cause) {
throw new SailfishException(cause);
}
}
public PeerChannelInitializer(Config config, ObjectEncoder encoder, EventExecutorGroup peerChannelHandlerExecutorGroup,
PeerChannelHandler peerChannelHandler) {
this.config = config;
this.encoder = encoder;
this.peerChannelHandlerExecutorGroup = peerChannelHandlerExecutorGroup;
this.peerChannelHandler = peerChannelHandler;
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name,
boolean inbound, boolean outbound) {
if (name == null) {
throw new NullPointerException("name");
}
channel = pipeline.channel;
this.pipeline = pipeline;
this.name = name;
if (group != null) {
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = pipeline.childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
pipeline.childExecutors.put(group, childExecutor);
}
executor = childExecutor;
} else {
executor = null;
}
this.inbound = inbound;
this.outbound = outbound;
}
@Override
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addFirst0(name, newCtx);
}
return this;
}
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}