下面列出了io.netty.channel.ChannelHandlerContext#executor ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing autoflush handler on channel {}", ctx.channel());
}
switch (state) {
case 1:
case 2:
return;
}
state = 1;
EventExecutor loop = ctx.executor();
lastExecutionTime = System.nanoTime();
resenderTimeout = loop.schedule(new WriterIdleTimeoutTask(ctx), resenderTimeNanos, TimeUnit.NANOSECONDS);
}
public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise) {
final ChannelHandlerContext ctx = channel.pipeline().context(Http2MultiplexCodec.class);
if (ctx == null) {
if (channel.isActive()) {
promise.setFailure(new IllegalStateException(StringUtil.simpleClassName(Http2MultiplexCodec.class) +
" must be in the ChannelPipeline of Channel " + channel));
} else {
promise.setFailure(new ClosedChannelException());
}
} else {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
open0(ctx, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
open0(ctx, promise);
}
});
}
}
return promise;
}
/**
* Performs TLS renegotiation.
*/
public Future<Channel> renegotiate(final Promise<Channel> promise) {
if (promise == null) {
throw new NullPointerException("promise");
}
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
throw new IllegalStateException();
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
public void run() {
handshake(promise);
}
});
return promise;
}
handshake(promise);
return promise;
}
/**
* Performs TLS renegotiation.
*/
public Future<Channel> renegotiate(final Promise<Channel> promise) {
if (promise == null) {
throw new NullPointerException("promise");
}
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
throw new IllegalStateException();
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
handshake(promise);
}
});
return promise;
}
handshake(promise);
return promise;
}
@Override
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
}
@Override
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
}
/**
* Close this {@link Lz4FrameEncoder} and so finish the encoding.
* The given {@link ChannelFuture} will be notified once the operation
* completes and will also be returned.关闭这个Lz4FrameEncoder,然后完成编码。一旦操作完成,给定的ChannelFuture将被通知,并且也将被返回。
*/
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return promise;
}
}
@Override
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
if (logger.isTraceEnabled()) {
logger.trace("Initializing autoflush handler on channel {} Cid: {}", ctx.channel(), MqttUtil.clientID(ctx.channel()));
}
switch (state) {
case 1:
case 2:
return;
default:
break;
}
state = 1;
EventExecutor loop = ctx.executor();
lastWriteTime = System.nanoTime();
writerIdleTimeout = loop.schedule(new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
if (logger.isDebugEnabled()) {
logger.debug("Initializing autoflush handler on channel {}", ctx.channel());
}
switch (state) {
case 1:
case 2:
return;
default:
break;
}
state = 1;
EventExecutor loop = ctx.executor();
lastExecutionTime = System.nanoTime();
resenderTimeout = loop.schedule(new WriterIdleTimeoutTask(ctx), resenderTimeNanos, TimeUnit.NANOSECONDS);
}
@Override
public void channelActive ( final ChannelHandlerContext ctx ) throws Exception
{
logger.debug ( "Channel active - {}", ctx );
// we may and must remember the channel context, to know were to send notifications to
this.ctx = ctx;
this.source = new DataModuleMessageSource ( this.options, ctx.executor (), new ContextChannelWriter ( ctx ), this.dataModel, this.backgroundScanPeriod );
this.messageChannel.addSource ( this.source );
this.spontHandler = new DataListenerImpl ( this.source );
this.messageChannel.startTimers();
super.channelActive ( ctx );
}
@Override
@SuppressWarnings("FutureReturnValueIgnored") // for aggregatePromise.doneAllocatingPromises
public void flush(final ChannelHandlerContext ctx) throws GeneralSecurityException {
if (pendingUnprotectedWrites == null || pendingUnprotectedWrites.isEmpty()) {
// Return early if there's nothing to write. Otherwise protector.protectFlush() below may
// not check for "no-data" and go on writing the 0-byte "data" to the socket with the
// protection framing.
return;
}
// Flushes can happen after close, but only when there are no pending writes.
checkState(protector != null, "flush() called after close()");
final ProtectedPromise aggregatePromise =
new ProtectedPromise(ctx.channel(), ctx.executor(), pendingUnprotectedWrites.size());
List<ByteBuf> bufs = new ArrayList<>(pendingUnprotectedWrites.size());
// Drain the unprotected writes.
while (!pendingUnprotectedWrites.isEmpty()) {
ByteBuf in = (ByteBuf) pendingUnprotectedWrites.current();
bufs.add(in.retain());
// Remove and release the buffer and add its promise to the aggregate.
aggregatePromise.addUnprotectedPromise(pendingUnprotectedWrites.remove());
}
final class ProtectedFrameWriteFlusher implements Consumer<ByteBuf> {
@Override
public void accept(ByteBuf byteBuf) {
ctx.writeAndFlush(byteBuf, aggregatePromise.newPromise());
}
}
protector.protectFlush(bufs, new ProtectedFrameWriteFlusher(), ctx.alloc());
// We're done writing, start the flow of promise events.
aggregatePromise.doneAllocatingPromises();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) {
CompletableFuture<HttpRequest> future = CompletableFuture.completedFuture(httpRequest);
Executor executor = ctx.executor();
future.thenApplyAsync(req -> buildWebContext(ctx, req), executor)
.thenApplyAsync(this::executeLogic, executor)
.thenApplyAsync(this::buildResponse, executor)
.exceptionally(this::handleException)
.thenAcceptAsync(msg -> writeResponse(ctx, future, msg), ctx.channel().eventLoop());
}
@Override
public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
if (ctx.executor() != ctx.channel().eventLoop()) {
throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
}
this.ctx = ctx;
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
EventExecutor loop = ctx.executor();
lastReadTime = lastWriteTime = System.nanoTime();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = loop.schedule(
new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = loop.schedule(
new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = loop.schedule(
new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
ctx.channel().hashCode(), checkInterval);
setTrafficCounter(trafficCounter);
trafficCounter.start();
super.handlerAdded(ctx);
}
@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
ByteBuf headerBlock = null;
SimpleChannelPromiseAggregator promiseAggregator =
new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
try {
verifyStreamId(streamId, STREAM_ID);
verifyStreamId(promisedStreamId, "Promised Stream ID");
verifyPadding(padding);
// Encode the entire header block into an intermediate buffer.
headerBlock = ctx.alloc().buffer();
headersEncoder.encodeHeaders(streamId, headers, headerBlock);
// Read the first fragment (possibly everything).
Http2Flags flags = new Http2Flags().paddingPresent(padding > 0);
// INT_FIELD_LENGTH is for the length of the promisedStreamId
int nonFragmentLength = INT_FIELD_LENGTH + padding;
int maxFragmentLength = maxFrameSize - nonFragmentLength;
ByteBuf fragment = headerBlock.readRetainedSlice(min(headerBlock.readableBytes(), maxFragmentLength));
flags.endOfHeaders(!headerBlock.isReadable());
int payloadLength = fragment.readableBytes() + nonFragmentLength;
ByteBuf buf = ctx.alloc().buffer(PUSH_PROMISE_FRAME_HEADER_LENGTH);
writeFrameHeaderInternal(buf, payloadLength, PUSH_PROMISE, flags, streamId);
writePaddingLength(buf, padding);
// Write out the promised stream ID.
buf.writeInt(promisedStreamId);
ctx.write(buf, promiseAggregator.newPromise());
// Write the first fragment.
ctx.write(fragment, promiseAggregator.newPromise());
// Write out the padding, if any.
if (paddingBytes(padding) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding)), promiseAggregator.newPromise());
}
if (!flags.endOfHeaders()) {
writeContinuationFrames(ctx, streamId, headerBlock, padding, promiseAggregator);
}
} catch (Http2Exception e) {
promiseAggregator.setFailure(e);
} catch (Throwable t) {
promiseAggregator.setFailure(t);
promiseAggregator.doneAllocatingPromises();
PlatformDependent.throwException(t);
} finally {
if (headerBlock != null) {
headerBlock.release();
}
}
return promiseAggregator.doneAllocatingPromises();
}
private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx,
int streamId, Http2Headers headers, int padding, boolean endStream,
boolean hasPriority, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
ByteBuf headerBlock = null;
SimpleChannelPromiseAggregator promiseAggregator =
new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
try {
verifyStreamId(streamId, STREAM_ID);
if (hasPriority) {
verifyStreamOrConnectionId(streamDependency, STREAM_DEPENDENCY);
verifyPadding(padding);
verifyWeight(weight);
}
// Encode the entire header block.
headerBlock = ctx.alloc().buffer();
headersEncoder.encodeHeaders(streamId, headers, headerBlock);
Http2Flags flags =
new Http2Flags().endOfStream(endStream).priorityPresent(hasPriority).paddingPresent(padding > 0);
// Read the first fragment (possibly everything).
int nonFragmentBytes = padding + flags.getNumPriorityBytes();
int maxFragmentLength = maxFrameSize - nonFragmentBytes;
ByteBuf fragment = headerBlock.readRetainedSlice(min(headerBlock.readableBytes(), maxFragmentLength));
// Set the end of headers flag for the first frame.
flags.endOfHeaders(!headerBlock.isReadable());
int payloadLength = fragment.readableBytes() + nonFragmentBytes;
ByteBuf buf = ctx.alloc().buffer(HEADERS_FRAME_HEADER_LENGTH);
writeFrameHeaderInternal(buf, payloadLength, HEADERS, flags, streamId);
writePaddingLength(buf, padding);
if (hasPriority) {
buf.writeInt(exclusive ? (int) (0x80000000L | streamDependency) : streamDependency);
// Adjust the weight so that it fits into a single byte on the wire.
buf.writeByte(weight - 1);
}
ctx.write(buf, promiseAggregator.newPromise());
// Write the first fragment.
ctx.write(fragment, promiseAggregator.newPromise());
// Write out the padding, if any.
if (paddingBytes(padding) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding)), promiseAggregator.newPromise());
}
if (!flags.endOfHeaders()) {
writeContinuationFrames(ctx, streamId, headerBlock, padding, promiseAggregator);
}
} catch (Http2Exception e) {
promiseAggregator.setFailure(e);
} catch (Throwable t) {
promiseAggregator.setFailure(t);
promiseAggregator.doneAllocatingPromises();
PlatformDependent.throwException(t);
} finally {
if (headerBlock != null) {
headerBlock.release();
}
}
return promiseAggregator.doneAllocatingPromises();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (inClass.isInstance(msg)) {
receivedInMessage(ctx);
InT inMsg = inClass.cast(msg);
if (inMsg instanceof FullHttpMessage) {
// Forward as is
ctx.fireChannelRead(inMsg);
consumedInMessage(ctx);
} else if (!hasBody(inMsg)) {
// Wrap in empty message
ctx.fireChannelRead(createEmptyMessage(inMsg));
consumedInMessage(ctx);
// There will be a LastHttpContent message coming after this, ignore it
ignoreBodyRead = true;
} else {
currentlyStreamedMessage = inMsg;
// It has a body, stream it
HandlerPublisher<HttpContent> publisher = new HandlerPublisher<HttpContent>(ctx.executor(), HttpContent.class) {
@Override
protected void cancelled() {
if (ctx.executor().inEventLoop()) {
handleCancelled(ctx, inMsg);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
handleCancelled(ctx, inMsg);
}
});
}
}
@Override
protected void requestDemand() {
bodyRequested(ctx);
super.requestDemand();
}
};
ctx.channel().pipeline().addAfter(ctx.name(), ctx.name() + "-body-publisher", publisher);
ctx.fireChannelRead(createStreamedMessage(inMsg, publisher));
}
} else if (msg instanceof HttpContent) {
handleReadHttpContent(ctx, (HttpContent) msg);
}
}