下面列出了怎么用io.netty.util.concurrent.EventExecutor的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void unrecoverableErrorOccurred(Throwable error, boolean guaranteesBrokenDownstreamResponse) {
// Cancel request streaming so it stops trying to send data downstream and releases any chunks we've been
// holding onto. This holds true no matter the value of guaranteesBrokenDownstreamResponse
// (i.e. we want to stop sending data downstream no matter what). Note that this does not stop the
// downstream call's response, and that is intentional to support use cases where the downstream
// system can still successfully send a full response even though the request wasn't fully sent.
proxyRouterProcessingState.cancelRequestStreaming(error, ctx);
setDownstreamCallTimeOnRequestAttributesIfNotAlreadyDone();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
sendUnrecoverableErrorDownPipeline(error, guaranteesBrokenDownstreamResponse);
}
else {
executor.execute(() -> sendUnrecoverableErrorDownPipeline(error, guaranteesBrokenDownstreamResponse));
}
}
private boolean addInvalidNonce(final Nonce nonce, final EventExecutor executor) {
long now = System.currentTimeMillis();
long invalidBefore = now - firstUseTimeOut;
long timeTillInvalid = nonce.timeStamp - invalidBefore;
if (timeTillInvalid > 0) {
if (invalidNonces.add(nonce.nonce)) {
executor.schedule(new InvalidNonceCleaner(nonce.nonce), timeTillInvalid, TimeUnit.MILLISECONDS);
return true;
} else {
return false;
}
} else {
// So close to expiring any record of this nonce being used could have been cleared so
// don't take a chance and just say no.
return false;
}
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing autoflush handler on channel {}", ctx.channel());
}
switch (state) {
case 1:
case 2:
return;
}
state = 1;
EventExecutor loop = ctx.executor();
lastExecutionTime = System.nanoTime();
resenderTimeout = loop.schedule(new WriterIdleTimeoutTask(ctx), resenderTimeNanos, TimeUnit.NANOSECONDS);
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing autoflush handler on channel {} Cid: {}", ctx.channel(),
NettyUtils.clientID(ctx.channel()));
}
switch (state) {
case 1:
case 2:
return;
}
state = 1;
EventExecutor loop = ctx.executor();
lastWriteTime = System.nanoTime();
writerIdleTimeout = loop.schedule(new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
/**
* Performs TLS renegotiation.
*/
public Future<Channel> renegotiate(final Promise<Channel> promise) {
if (promise == null) {
throw new NullPointerException("promise");
}
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
throw new IllegalStateException();
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
handshake(promise);
}
});
return promise;
}
handshake(promise);
return promise;
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
if (logger.isDebugEnabled()) {
logger.debug("Initializing autoflush handler on channel {}", ctx.channel());
}
switch (state) {
case 1:
case 2:
return;
default:
break;
}
state = 1;
EventExecutor loop = ctx.executor();
lastExecutionTime = System.nanoTime();
resenderTimeout = loop.schedule(new WriterIdleTimeoutTask(ctx), resenderTimeNanos, TimeUnit.NANOSECONDS);
}
public ClientConnectionsShutdown(ChannelGroup channels, EventExecutor executor, EurekaClient discoveryClient)
{
this.channels = channels;
this.executor = executor;
this.discoveryClient = discoveryClient;
if (discoveryClient != null) {
initDiscoveryListener();
}
// Only uncomment this for local testing!
// Allow a fast property to invoke connection shutdown for testing purposes.
// DynamicBooleanProperty DEBUG_SHUTDOWN = new DynamicBooleanProperty("server.outofservice.connections.shutdown.debug", false);
// DEBUG_SHUTDOWN.addCallback(() -> {
// if (DEBUG_SHUTDOWN.get()) {
// gracefullyShutdownClientChannels();
// }
// });
}
@Override
public ChannelFuture deregister(final ChannelPromise promise) {
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeDeregister(promise);
}
}, promise, null);
}
return promise;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
@Override
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
}
static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelWritabilityChanged();
} else {
Runnable task = next.invokeChannelWritableStateChangedTask;
if (task == null) {
next.invokeChannelWritableStateChangedTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelWritabilityChanged();
}
};
}
executor.execute(task);
}
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
/**
* Close this {@link Bzip2Encoder} and so finish the encoding.
* The given {@link ChannelFuture} will be notified once the operation
* completes and will also be returned.关闭这个Bzip2Encoder,然后完成编码。一旦操作完成,给定的ChannelFuture将被通知,并且也将被返回。
*/
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return promise;
}
}
@SuppressWarnings("unchecked")
StreamMessageProcessor(DefaultStreamMessageDuplicator<T> duplicator, StreamMessage<T> upstream,
SignalLengthGetter<?> signalLengthGetter,
EventExecutor executor, long maxSignalLength) {
this.duplicator = duplicator;
this.upstream = upstream;
this.signalLengthGetter = (SignalLengthGetter<Object>) signalLengthGetter;
this.executor = executor;
if (maxSignalLength == 0 || maxSignalLength > Integer.MAX_VALUE) {
this.maxSignalLength = Integer.MAX_VALUE;
} else {
this.maxSignalLength = (int) maxSignalLength;
}
signals = new SignalQueue(this.signalLengthGetter);
upstream.subscribe(this, executor,
WITH_POOLED_OBJECTS, SubscriptionOption.NOTIFY_CANCELLATION);
}
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
if (event == null) {
throw new NullPointerException("event");
}
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
return this;
}
/**
* Runs a task in the given {@link EventExecutor}. Runs immediately if the current thread is in the
* eventExecutor.
*
* @param eventExecutor Executor to run task in.
* @param runnable Task to run.
*/
public static void doInEventLoop(EventExecutor eventExecutor, Runnable runnable) {
if (eventExecutor.inEventLoop()) {
runnable.run();
} else {
eventExecutor.submit(runnable);
}
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name,
boolean inbound, boolean outbound) {
if (name == null) {
throw new NullPointerException("name");
}
channel = pipeline.channel;
this.pipeline = pipeline;
this.name = name;
if (group != null) {
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = pipeline.childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
pipeline.childExecutors.put(group, childExecutor);
}
executor = childExecutor;
} else {
executor = null;
}
this.inbound = inbound;
this.outbound = outbound;
}
@Override
protected EventExecutor executor() {
if (ctx == null) {
throw new IllegalStateException();
}
return ctx.executor();
}
/**
* Returns the {@link AddressResolver} associated with the specified {@link EventExecutor}. If there's no associated
* resolved found, this method creates and returns a new resolver instance created by
* {@link #newResolver(EventExecutor)} so that the new resolver is reused on another
* {@link #getResolver(EventExecutor)} call with the same {@link EventExecutor}.
* 返回与指定的EventExecutor关联的地址解析器。如果没有找到关联的解析,该方法将创建并返回由newResolver(EventExecutor)创建的一个新的解析器实例,以便新的解析器在使用相同的EventExecutor的另一个getResolver(EventExecutor)调用中重用。
*/
public AddressResolver<T> getResolver(final EventExecutor executor) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (executor.isShuttingDown()) {
throw new IllegalStateException("executor not accepting a task");
}
AddressResolver<T> r;
synchronized (resolvers) {
r = resolvers.get(executor);
if (r == null) {
final AddressResolver<T> newResolver;
try {
newResolver = newResolver(executor);
} catch (Exception e) {
throw new IllegalStateException("failed to create a new resolver", e);
}
resolvers.put(executor, newResolver);
executor.terminationFuture().addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
synchronized (resolvers) {
resolvers.remove(executor);
}
newResolver.close();
}
});
r = newResolver;
}
}
return r;
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
@VisibleForTesting
public void waitForEachEventLoop() throws InterruptedException, ExecutionException
{
for (EventExecutor exec : serverGroup.clientToProxyWorkerPool)
{
exec.submit(() -> {
// Do nothing.
}).get();
}
}
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
* @param cause the cause of failure
*/
FailedChannelFuture(Channel channel, EventExecutor executor, Throwable cause) {
super(channel, executor);
if (cause == null) {
throw new NullPointerException("cause");
}
this.cause = cause;
}
DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
super(executor);
this.group = group;
this.futures = Collections.unmodifiableMap(futures);
for (ChannelFuture f: this.futures.values()) {
f.addListener(childListener);
}
// Done on arrival?
if (this.futures.isEmpty()) {
setSuccess0();
}
}
/**
* Invoke log files receive handler.
*
* @param ctx the ctx
*/
private void invokeLogFilesReceiveHandlerForHA(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
EventExecutor e1 = new DefaultEventExecutorGroup(1).next();
p.addLast("stringDecoder", new StringDecoder());
p.addLast("delegator", new Delegator(receiveDirectory));
p.addLast(HEARTBEAT_HANDLER, new HeartbeatHandler(JumbuneAgent.getHeartBeatMillis(),
JumbuneAgent.getHeartBeatMillis(), JumbuneAgent.getHeartBeatMillis()));
p.addLast("stringEncoder", new StringEncoder());
p.addLast(e1, new LogFilesEncoder());
p.remove(this);
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
void registerRequestTimeout(EventExecutor executor)
{
try {
timeout.set(executor.schedule(
() -> onChannelError(new RequestTimeoutException("Timed out waiting " + requestTimeout + " to receive response")),
requestTimeout.toMillis(),
MILLISECONDS));
}
catch (Throwable throwable) {
onChannelError(new TTransportException("Unable to schedule request timeout", throwable));
throw throwable;
}
}
DownstreamSubscription(ChildStreamMessage<T> streamMessage,
Subscriber<? super T> subscriber, StreamMessageProcessor<T> processor,
EventExecutor executor, boolean withPooledObjects, boolean notifyCancellation) {
this.streamMessage = streamMessage;
this.subscriber = subscriber;
this.processor = processor;
this.executor = executor;
this.withPooledObjects = withPooledObjects;
this.notifyCancellation = notifyCancellation;
}
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelUnregistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelUnregistered();
}
});
}
}
static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
ObjectUtil.checkNotNull(event, "event");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
}