下面列出了io.netty.channel.ChannelHandlerContext#alloc ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBufAllocator allocator = ctx.alloc();
byte []data = null;
if(msg instanceof String){
data = ((String)msg).getBytes(charset);
}else if(msg instanceof byte[]){
data = (byte[])msg;
}
if(data != null){
ByteBuf byteBuf = allocator.buffer(data.length);
byteBuf.writeBytes(data);
super.write(ctx, byteBuf, promise);
return;
}
super.write(ctx, msg, promise);
}
/**
* Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
* in {@link OpenSslEngine}.当它被合并时,总是倾向于直接缓冲区,这样我们就减少了OpenSslEngine中内存拷贝的数量。
*/
private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
ByteBufAllocator alloc = ctx.alloc();
if (engineType.wantsDirectBuffer) {
return alloc.directBuffer(capacity);
} else {
return alloc.buffer(capacity);
}
}
private static ThriftFrame writeSuccessResponse(
ChannelHandlerContext context,
MethodMetadata methodMetadata,
Transport transport,
Protocol protocol,
int sequenceId,
boolean supportOutOfOrderResponse,
Object result)
throws Exception
{
TChannelBufferOutputTransport outputTransport = new TChannelBufferOutputTransport(context.alloc());
try {
writeResponse(
methodMetadata.getName(),
protocol.createProtocol(outputTransport),
sequenceId,
"success",
(short) 0,
methodMetadata.getResultCodec(),
result);
return new ThriftFrame(
sequenceId,
outputTransport.getBuffer(),
ImmutableMap.of(),
transport,
protocol,
supportOutOfOrderResponse);
}
finally {
outputTransport.release();
}
}
private static ThriftFrame writeApplicationException(
ChannelHandlerContext context,
String methodName,
Transport transport,
Protocol protocol,
int sequenceId,
boolean supportOutOfOrderResponse,
TApplicationException.Type errorCode,
String errorMessage)
throws Exception
{
TApplicationException applicationException = new TApplicationException(errorCode, errorMessage);
TChannelBufferOutputTransport outputTransport = new TChannelBufferOutputTransport(context.alloc());
try {
TProtocolWriter protocolWriter = protocol.createProtocol(outputTransport);
protocolWriter.writeMessageBegin(new TMessage(methodName, EXCEPTION, sequenceId));
ExceptionWriter.writeTApplicationException(applicationException, protocolWriter);
protocolWriter.writeMessageEnd();
return new ThriftFrame(
sequenceId,
outputTransport.getBuffer(),
ImmutableMap.of(),
transport,
protocol,
supportOutOfOrderResponse);
}
finally {
outputTransport.release();
}
}
/**
* Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
* in {@link OpenSslEngine}.
*/
private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
ByteBufAllocator alloc = ctx.alloc();
if (wantsDirectBuffer) {
return alloc.directBuffer(capacity);
} else {
return alloc.buffer(capacity);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf readMessage = (ByteBuf) msg;
System.out.println("channelRead : " + readMessage.toString(Charset.defaultCharset()));
ByteBufAllocator byteBufAllocator = ctx.alloc();
ByteBuf newBuffer = byteBufAllocator.buffer();
// newBuffer 사용.
ctx.write(msg);
}
private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
ByteBuf out = null;
ChannelPromise promise = null;
ByteBufAllocator alloc = ctx.alloc();
boolean needUnwrap = false;
ByteBuf buf = null;
try {
final int wrapDataSize = this.wrapDataSize;
// Only continue to loop if the handler was not removed in the meantime.
// See https://github.com/netty/netty/issues/5860
while (!ctx.isRemoved()) {
promise = ctx.newPromise();
buf = wrapDataSize > 0 ?
pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
pendingUnencryptedWrites.removeFirst(promise);
if (buf == null) {
break;
}
if (out == null) {
out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
}
SSLEngineResult result = wrap(alloc, engine, buf, out);
if (result.getStatus() == Status.CLOSED) {
buf.release();
buf = null;
promise.tryFailure(SSLENGINE_CLOSED);
promise = null;
// SSLEngine has been closed already.
// Any further write attempts should be denied.
pendingUnencryptedWrites.releaseAndFailAll(ctx, SSLENGINE_CLOSED);
return;
} else {
if (buf.isReadable()) {
pendingUnencryptedWrites.addFirst(buf, promise);
// When we add the buffer/promise pair back we need to be sure we don't complete the promise
// later in finishWrap. We only complete the promise if the buffer is completely consumed.
promise = null;
} else {
buf.release();
}
buf = null;
switch (result.getHandshakeStatus()) {
case NEED_TASK:
runDelegatedTasks();
break;
case FINISHED:
setHandshakeSuccess();
// deliberate fall-through
case NOT_HANDSHAKING:
setHandshakeSuccessIfStillHandshaking();
// deliberate fall-through
case NEED_WRAP:
finishWrap(ctx, out, promise, inUnwrap, false);
promise = null;
out = null;
break;
case NEED_UNWRAP:
needUnwrap = true;
return;
default:
throw new IllegalStateException(
"Unknown handshake status: " + result.getHandshakeStatus());
}
}
}
} finally {
// Ownership of buffer was not transferred, release it.
if (buf != null) {
buf.release();
}
finishWrap(ctx, out, promise, inUnwrap, needUnwrap);
}
}
/**
* This method will not call
* {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
* {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
* @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
*/
private boolean wrapNonAppData(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
ByteBuf out = null;
ByteBufAllocator alloc = ctx.alloc();
try {
// Only continue to loop if the handler was not removed in the meantime.
// See https://github.com/netty/netty/issues/5860
while (!ctx.isRemoved()) {
if (out == null) {
// As this is called for the handshake we have no real idea how big the buffer needs to be.
// That said 2048 should give us enough room to include everything like ALPN / NPN data.
// If this is not enough we will increase the buffer in wrap(...).
out = allocateOutNetBuf(ctx, 2048, 1);
}
SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
if (result.bytesProduced() > 0) {
ctx.write(out);
if (inUnwrap) {
needsFlush = true;
}
out = null;
}
switch (result.getHandshakeStatus()) {
case FINISHED:
setHandshakeSuccess();
return false;
case NEED_TASK:
runDelegatedTasks();
break;
case NEED_UNWRAP:
if (inUnwrap) {
// If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
// no use in trying to call wrap again because we have already attempted (or will after we
// return) to feed more data to the engine.
return false;
}
unwrapNonAppData(ctx);
break;
case NEED_WRAP:
break;
case NOT_HANDSHAKING:
setHandshakeSuccessIfStillHandshaking();
// Workaround for TLS False Start problem reported at:
// https://github.com/netty/netty/issues/1108#issuecomment-14266970
if (!inUnwrap) {
unwrapNonAppData(ctx);
}
return true;
default:
throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
}
if (result.bytesProduced() == 0) {
break;
}
// It should not consume empty buffers when it is not handshaking
// Fix for Android, where it was encrypting empty buffers even when not handshaking
if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
break;
}
}
} finally {
if (out != null) {
out.release();
}
}
return false;
}
private static ThriftFrame writeExceptionResponse(ChannelHandlerContext context,
MethodMetadata methodMetadata,
Transport transport,
Protocol protocol,
int sequenceId,
boolean supportOutOfOrderResponse,
Throwable exception)
throws Exception
{
Optional<Short> exceptionId = methodMetadata.getExceptionId(exception.getClass());
if (exceptionId.isPresent()) {
TChannelBufferOutputTransport outputTransport = new TChannelBufferOutputTransport(context.alloc());
try {
TProtocolWriter protocolWriter = protocol.createProtocol(outputTransport);
writeResponse(
methodMetadata.getName(),
protocolWriter,
sequenceId,
"exception",
exceptionId.get(),
methodMetadata.getExceptionCodecs().get(exceptionId.get()),
exception);
return new ThriftFrame(
sequenceId,
outputTransport.getBuffer(),
ImmutableMap.of(),
transport,
protocol,
supportOutOfOrderResponse);
}
finally {
outputTransport.release();
}
}
String message = format("Internal error processing method [%s]", methodMetadata.getName());
TApplicationException.Type type = INTERNAL_ERROR;
if (exception instanceof TApplicationException) {
type = ((TApplicationException) exception).getType().orElse(INTERNAL_ERROR);
}
else {
log.warn(exception, message);
}
return writeApplicationException(
context,
methodMetadata.getName(),
transport,
protocol,
sequenceId,
supportOutOfOrderResponse,
type,
message + ": " + exception.getMessage());
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
alloc = ctx.alloc();
}
private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
ByteBuf out = null;
ChannelPromise promise = null;
ByteBufAllocator alloc = ctx.alloc();
try {
for (;;) {
Object msg = pendingUnencryptedWrites.current();
if (msg == null) {
break;
}
if (!(msg instanceof ByteBuf)) {
pendingUnencryptedWrites.removeAndWrite();
continue;
}
ByteBuf buf = (ByteBuf) msg;
if (out == null) {
out = allocateOutNetBuf(ctx, buf.readableBytes());
}
SSLEngineResult result = wrap(alloc, engine, buf, out);
if (!buf.isReadable()) {
promise = pendingUnencryptedWrites.remove();
} else {
promise = null;
}
if (result.getStatus() == Status.CLOSED) {
// SSLEngine has been closed already.
// Any further write attempts should be denied.
pendingUnencryptedWrites.removeAndFailAll(SSLENGINE_CLOSED);
return;
} else {
switch (result.getHandshakeStatus()) {
case NEED_TASK:
runDelegatedTasks();
break;
case FINISHED:
setHandshakeSuccess();
// deliberate fall-through
case NOT_HANDSHAKING:
setHandshakeSuccessIfStillHandshaking();
// deliberate fall-through
case NEED_WRAP:
finishWrap(ctx, out, promise, inUnwrap);
promise = null;
out = null;
break;
case NEED_UNWRAP:
return;
default:
throw new IllegalStateException(
"Unknown handshake status: " + result.getHandshakeStatus());
}
}
}
} catch (SSLException e) {
setHandshakeFailure(ctx, e);
throw e;
} finally {
finishWrap(ctx, out, promise, inUnwrap);
}
}
private void wrapNonAppData(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
ByteBuf out = null;
ByteBufAllocator alloc = ctx.alloc();
try {
for (;;) {
if (out == null) {
out = allocateOutNetBuf(ctx, 0);
}
SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
if (result.bytesProduced() > 0) {
ctx.write(out);
if (inUnwrap) {
needsFlush = true;
}
out = null;
}
switch (result.getHandshakeStatus()) {
case FINISHED:
setHandshakeSuccess();
break;
case NEED_TASK:
runDelegatedTasks();
break;
case NEED_UNWRAP:
if (!inUnwrap) {
unwrapNonAppData(ctx);
}
break;
case NEED_WRAP:
break;
case NOT_HANDSHAKING:
setHandshakeSuccessIfStillHandshaking();
// Workaround for TLS False Start problem reported at:
// https://github.com/netty/netty/issues/1108#issuecomment-14266970
if (!inUnwrap) {
unwrapNonAppData(ctx);
}
break;
default:
throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
}
if (result.bytesProduced() == 0) {
break;
}
}
} catch (SSLException e) {
setHandshakeFailure(ctx, e);
throw e;
} finally {
if (out != null) {
out.release();
}
}
}