下面列出了io.netty.channel.ChannelHandlerContext#isRemoved ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void execute(ChannelHandlerContext ctx, Client client) {
if (!client.isAuth) {
sendError("Access denied");
return;
}
if (!client.permissions.isPermission(ClientPermissions.PermissionConsts.ADMIN)) {
sendError("Access denied");
return;
}
if (client.logOutput != null) {
LogHelper.info("Client %s remove log listener", client.username);
LogHelper.removeOutput(client.logOutput);
} else {
LogHelper.info("Client %s add log listener", client.username);
LogHelper.Output output = (str) -> {
if (!ctx.isRemoved()) {
service.sendObject(ctx, new LogEvent(str));
} else {
LogHelper.removeOutput(client.logOutput);
LogHelper.info("Client %s remove log listener", client.username);
}
};
client.logOutput = new LogHelper.OutputEnity(output, outputType);
LogHelper.addOutput(client.logOutput);
}
}
/**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass())
+ ".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
synchronized (pendings) {
pendings.add(msg);
}
if (ctx.isRemoved()) {
flushPendings(ctx);
ctx.flush();
}
}
/**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
protected void removeInternalHandler(ChannelHandlerContext ctx)
{
// Remove timeout handler if not already removed.
ChannelHandlerContext handlerContext = ctx.pipeline().context(INTERNAL_HANDLER_NAME);
if (handlerContext != null && ! handlerContext.isRemoved()) {
ctx.pipeline().remove(INTERNAL_HANDLER_NAME);
}
}
private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
ByteBuf out = null;
ChannelPromise promise = null;
ByteBufAllocator alloc = ctx.alloc();
boolean needUnwrap = false;
ByteBuf buf = null;
try {
final int wrapDataSize = this.wrapDataSize;
// Only continue to loop if the handler was not removed in the meantime.
// See https://github.com/netty/netty/issues/5860
while (!ctx.isRemoved()) {
promise = ctx.newPromise();
buf = wrapDataSize > 0 ?
pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
pendingUnencryptedWrites.removeFirst(promise);
if (buf == null) {
break;
}
if (out == null) {
out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
}
SSLEngineResult result = wrap(alloc, engine, buf, out);
if (result.getStatus() == Status.CLOSED) {
buf.release();
buf = null;
promise.tryFailure(SSLENGINE_CLOSED);
promise = null;
// SSLEngine has been closed already.
// Any further write attempts should be denied.
pendingUnencryptedWrites.releaseAndFailAll(ctx, SSLENGINE_CLOSED);
return;
} else {
if (buf.isReadable()) {
pendingUnencryptedWrites.addFirst(buf, promise);
// When we add the buffer/promise pair back we need to be sure we don't complete the promise
// later in finishWrap. We only complete the promise if the buffer is completely consumed.
promise = null;
} else {
buf.release();
}
buf = null;
switch (result.getHandshakeStatus()) {
case NEED_TASK:
runDelegatedTasks();
break;
case FINISHED:
setHandshakeSuccess();
// deliberate fall-through
case NOT_HANDSHAKING:
setHandshakeSuccessIfStillHandshaking();
// deliberate fall-through
case NEED_WRAP:
finishWrap(ctx, out, promise, inUnwrap, false);
promise = null;
out = null;
break;
case NEED_UNWRAP:
needUnwrap = true;
return;
default:
throw new IllegalStateException(
"Unknown handshake status: " + result.getHandshakeStatus());
}
}
}
} finally {
// Ownership of buffer was not transferred, release it.
if (buf != null) {
buf.release();
}
finishWrap(ctx, out, promise, inUnwrap, needUnwrap);
}
}
/**
* This method will not call
* {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
* {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
* @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
*/
private boolean wrapNonAppData(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
ByteBuf out = null;
ByteBufAllocator alloc = ctx.alloc();
try {
// Only continue to loop if the handler was not removed in the meantime.
// See https://github.com/netty/netty/issues/5860
while (!ctx.isRemoved()) {
if (out == null) {
// As this is called for the handshake we have no real idea how big the buffer needs to be.
// That said 2048 should give us enough room to include everything like ALPN / NPN data.
// If this is not enough we will increase the buffer in wrap(...).
out = allocateOutNetBuf(ctx, 2048, 1);
}
SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
if (result.bytesProduced() > 0) {
ctx.write(out);
if (inUnwrap) {
needsFlush = true;
}
out = null;
}
switch (result.getHandshakeStatus()) {
case FINISHED:
setHandshakeSuccess();
return false;
case NEED_TASK:
runDelegatedTasks();
break;
case NEED_UNWRAP:
if (inUnwrap) {
// If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
// no use in trying to call wrap again because we have already attempted (or will after we
// return) to feed more data to the engine.
return false;
}
unwrapNonAppData(ctx);
break;
case NEED_WRAP:
break;
case NOT_HANDSHAKING:
setHandshakeSuccessIfStillHandshaking();
// Workaround for TLS False Start problem reported at:
// https://github.com/netty/netty/issues/1108#issuecomment-14266970
if (!inUnwrap) {
unwrapNonAppData(ctx);
}
return true;
default:
throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
}
if (result.bytesProduced() == 0) {
break;
}
// It should not consume empty buffers when it is not handshaking
// Fix for Android, where it was encrypting empty buffers even when not handshaking
if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
break;
}
}
} finally {
if (out != null) {
out.release();
}
}
return false;
}
/**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* 一旦数据应该从给定的ByteBuf解码时调用。只要进行解码,此方法将调用decode(ChannelHandlerContext, ByteBuf, List)。
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.//在继续解码之前,检查这个处理程序是否已被删除。
//如果它被移除,继续对缓冲区进行操作是不安全的
//
// See:
// - https://github.com/netty/netty/issues/4635
// 检查pipeline中handler是否花已移除
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 对byteBuf的数据进行解码
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.//在继续循环之前,检查这个处理程序是否被删除。
//如果它被移除,继续对缓冲区进行操作是不安全的。
//
// See https://github.com/netty/netty/issues/1664
// handler是否被删除
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (ctx.isRemoved()) {
return;
}
// Will use the first N bytes to detect a protocol depending on the protocol.
if (in.readableBytes() < 8) {
return;
}
if (handshakeTimeout > 0 && timeoutFuture != null) {
timeoutFuture.cancel(true);
timeoutFuture = null;
}
final int magic1 = in.getUnsignedByte(in.readerIndex());
final int magic2 = in.getUnsignedByte(in.readerIndex() + 1);
if (http && isHttp(magic1, magic2)) {
switchToHttp(ctx);
return;
}
String protocolToUse = null;
Set<String> protocolSet = protocolMap.keySet();
if (!protocolSet.isEmpty()) {
// Use getBytes(...) as this works with direct and heap buffers.
// See https://issues.jboss.org/browse/HORNETQ-1406
byte[] bytes = new byte[8];
in.getBytes(0, bytes);
for (String protocol : protocolSet) {
ProtocolManager protocolManager = protocolMap.get(protocol);
if (protocolManager.isProtocol(bytes)) {
protocolToUse = protocol;
break;
}
}
}
//if we get here we assume we use the core protocol as we match nothing else
if (protocolToUse == null) {
for (Map.Entry<String, ProtocolManager> entry : protocolMap.entrySet()) {
if (entry.getValue().acceptsNoHandshake()) {
protocolToUse = entry.getKey();
break;
}
}
if (protocolToUse == null) {
protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
}
}
ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse);
if (protocolManagerToUse == null) {
ActiveMQServerLogger.LOGGER.failedToFindProtocolManager(ctx.channel() == null ? null : ctx.channel().remoteAddress() == null ? null : ctx.channel().remoteAddress().toString(), ctx.channel() == null ? null : ctx.channel().localAddress() == null ? null : ctx.channel().localAddress().toString(), protocolToUse, protocolMap.keySet().toString());
return;
}
ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator();
ChannelPipeline pipeline = ctx.pipeline();
protocolManagerToUse.addChannelHandlers(pipeline);
pipeline.addLast("handler", channelHandler);
NettyServerConnection connection = channelHandler.createConnection(ctx, protocolToUse, httpEnabled);
protocolManagerToUse.handshake(connection, new ChannelBufferWrapper(in));
pipeline.remove(this);
ctx.flush();
}