下面列出了怎么用 io.netty.handler.codec.ByteToMessageDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。
public static Channel connect(ByteToMessageDecoder decoder) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(256 * 1024));
ch.config().setOption(ChannelOption.SO_RCVBUF, 256 * 1024);
ch.config().setOption(ChannelOption.SO_BACKLOG, 1024);
ch.pipeline()
.addLast("readTimeoutHandler", new ReadTimeoutHandler(600, TimeUnit.SECONDS))
.addLast("writeTimeoutHandler", new WriteTimeoutHandler(600, TimeUnit.SECONDS));
ch.pipeline().addLast("protoPender", new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast("lengthDecode", new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast("handshakeHandler", decoder);
ch.closeFuture();
}
}).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT);
return b.connect("127.0.0.1", port).sync().channel();
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
UserConnection info = new UserConnection(socketChannel);
// init protocol
new ProtocolPipeline(info);
// Add originals
this.method.invoke(this.original, socketChannel);
HandlerConstructor constructor = ClassGenerator.getConstructor();
// Add our transformers
MessageToByteEncoder encoder = constructor.newEncodeHandler(info, (MessageToByteEncoder) socketChannel.pipeline().get("encoder"));
ByteToMessageDecoder decoder = constructor.newDecodeHandler(info, (ByteToMessageDecoder) socketChannel.pipeline().get("decoder"));
BukkitPacketHandler chunkHandler = new BukkitPacketHandler(info);
socketChannel.pipeline().replace("encoder", "encoder", encoder);
socketChannel.pipeline().replace("decoder", "decoder", decoder);
socketChannel.pipeline().addAfter("packet_handler", "viaversion_packet_handler", chunkHandler);
}
@Override
protected void initChannel(Channel channel) throws Exception {
// Ensure ViaVersion is loaded
if (ProtocolRegistry.SERVER_PROTOCOL != -1
&& channel instanceof SocketChannel) { // channel can be LocalChannel on internal server
UserConnection info = new UserConnection((SocketChannel) channel);
// init protocol
new ProtocolPipeline(info);
// Add originals
this.method.invoke(this.original, channel);
// Add our transformers
MessageToByteEncoder encoder = new SpongeEncodeHandler(info, (MessageToByteEncoder) channel.pipeline().get("encoder"));
ByteToMessageDecoder decoder = new SpongeDecodeHandler(info, (ByteToMessageDecoder) channel.pipeline().get("decoder"));
SpongePacketHandler chunkHandler = new SpongePacketHandler(info);
channel.pipeline().replace("encoder", "encoder", encoder);
channel.pipeline().replace("decoder", "decoder", decoder);
channel.pipeline().addAfter("packet_handler", "viaversion_packet_handler", chunkHandler);
} else {
this.method.invoke(this.original, channel);
}
}
static void autoAddHttpExtractor(Connection c, String name, ChannelHandler handler){
if (handler instanceof ByteToMessageDecoder
|| handler instanceof ByteToMessageCodec
|| handler instanceof CombinedChannelDuplexHandler) {
String extractorName = name+"$extractor";
if(c.channel().pipeline().context(extractorName) != null){
return;
}
c.channel().pipeline().addBefore(name, extractorName, HTTP_EXTRACTOR);
if(c.isPersistent()){
c.onTerminate().subscribe(null, null, () -> c.removeHandler(extractorName));
}
}
}
private ChannelHandler newSslInitiator() {
return new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 1) {
if ('S' == in.readByte()) { // SSL supported response
ctx.pipeline().remove(this);
ctx.pipeline().addFirst(
SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build()
.newHandler(ctx.alloc()));
} else {
ctx.fireExceptionCaught(new IllegalStateException("SSL required but not supported by Postgres"));
}
}
}
};
}
private final ByteToMessageDecoder toBufferReaderDecoder() {
return new ByteToMessageDecoder() {
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out)
throws Exception {
transformBufferReader.apply(new BufferReader(in)).ifPresent(out::add);
}
};
}
/**
* Call the decode method on a netty ByteToMessageDecoder
*
* @param decoder The decoder
* @param ctx The current context
* @param input The packet to decode
* @return A list of the decoders output
* @throws InvocationTargetException If an exception happens while executing
*/
public static List<Object> callDecode(ByteToMessageDecoder decoder, ChannelHandlerContext ctx, Object input) throws InvocationTargetException {
List<Object> output = new ArrayList<>();
try {
PipelineUtil.DECODE_METHOD.invoke(decoder, ctx, input, output);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
return output;
}
@Override
public ByteToMessageDecoder getDecoder() {
ByteDecoder decoder = new ByteDecoder();
decoder.setMessageCodec(msgCodec);
decoder.setHeadCodec(headCodec);
return decoder;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Initialize our session Object when the channel is initialized, attach
// it to the channel.
ch.attr(NetworkConstants.SESSION_KEY).setIfAbsent(new PlayerIO(ch));
// Initialize the pipeline channel handlers.
ChannelDuplexHandler timeout = new IdleStateHandler(NetworkConstants.INPUT_TIMEOUT, 0, 0);
ByteToMessageDecoder loginHandshakeHandler = new LoginHandshakeHandler();
ch.pipeline().addLast("login-handshake", loginHandshakeHandler);
ch.pipeline().addLast("channel-handler", channelHandler);
ch.pipeline().addLast("timeout", timeout);
}
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
Math.toIntExact(transport.maxInitialLineLength.getBytes()),
Math.toIntExact(transport.maxHeaderSize.getBytes()),
Math.toIntExact(transport.maxChunkSize.getBytes()));
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(Math.toIntExact(transport.maxContentLength.getBytes()));
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline().addLast("aggregator", aggregator);
if (transport.compression) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
}
ch.pipeline().addLast("handler", new MainAndStaticFileHandler(
nodeName,
home,
nodeClient,
transport.getCorsConfig()
));
pipelineRegistry.registerItems(ch.pipeline(), transport.getCorsConfig());
if (SETTING_CORS_ENABLED.get(transport.settings())) {
ch.pipeline().addAfter("encoder", "cors", new Netty4CorsHandler(transport.getCorsConfig()));
}
}
@Override
protected final void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) {
final Optional<ByteBuf> nextMsg = nextHandshakeMessage(msg);
if (nextMsg.isPresent()) {
ctx.writeAndFlush(nextMsg.get());
} else if (handshaker.getStatus() != Handshaker.HandshakeStatus.SUCCESS) {
LOG.debug("waiting for more bytes");
} else {
final Bytes nodeId = handshaker.partyPubKey().getEncodedBytes();
if (!localNode.isReady()) {
// If we're handling a connection before the node is fully up, just disconnect
LOG.debug("Rejecting connection because local node is not ready {}", nodeId);
disconnect(ctx, DisconnectMessage.DisconnectReason.UNKNOWN);
return;
}
LOG.debug("Sending framed hello");
// Exchange keys done
final Framer framer = new Framer(handshaker.secrets());
final ByteToMessageDecoder deFramer =
new DeFramer(
framer,
subProtocols,
localNode,
expectedPeer,
connectionEventDispatcher,
connectionFuture,
metricsSystem);
ctx.channel()
.pipeline()
.addFirst(new ValidateFirstOutboundMessage(framer))
.replace(this, "DeFramer", deFramer);
ctx.writeAndFlush(new OutboundMessage(null, HelloMessage.create(localNode.getPeerInfo())))
.addListener(
ff -> {
if (ff.isSuccess()) {
LOG.debug("Successfully wrote hello message");
}
});
msg.retain();
ctx.fireChannelRead(msg);
}
}
protected EmbeddedChannelWriteAccumulatingHandlerContext(ByteBufAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator) {
this(alloc, handler, writeCumulator, new EmbeddedChannel());
}
protected EmbeddedChannelWriteAccumulatingHandlerContext(ByteBufAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator,
EmbeddedChannel channel) {
super(alloc, handler, channel);
this.cumulator = ObjectUtil.checkNotNull(writeCumulator, "writeCumulator");
}
SslThroughputBenchmarkHandlerContext(ByteBufAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator) {
super(alloc, handler, writeCumulator);
}
public ByteToMessageDecoder getDecoder(){
return new NettyTOMMessageDecoder(true, sessionTable, macLength,controller,rl,signatureLength,controller.getStaticConf().getUseMACs()==1?true:false);
}
public ByteToMessageDecoder getDecoder(){
return new NettyTOMMessageDecoder(false, sessionTable, macLength,controller,rl,signatureLength,controller.getStaticConf().getUseMACs()==1?true:false);
}
@Override
public BukkitDecodeHandler newDecodeHandler(UserConnection info, ByteToMessageDecoder minecraftDecoder) {
return new BukkitDecodeHandler(info, minecraftDecoder);
}
public BukkitDecodeHandler(UserConnection info, ByteToMessageDecoder minecraftDecoder) {
this.info = info;
this.minecraftDecoder = minecraftDecoder;
}
public SpongeDecodeHandler(UserConnection info, ByteToMessageDecoder minecraftDecoder) {
this.info = info;
this.minecraftDecoder = minecraftDecoder;
}
@Override
public ByteToMessageDecoder getCompressDecoder() {
return new ZstdDecoder();
}
@Override
public ByteToMessageDecoder getCompressDecoder() {
return new ZstdDecoder();
}
public ByteToMessageDecoder getDecoder(){
return new NettyTOMMessageDecoder(true, sessionTable,
controller,rl);
}
public ByteToMessageDecoder getDecoder(){
return new NettyTOMMessageDecoder(false, sessionTable,controller,rl);
}
@Override
public ByteToMessageDecoder newDecoder() {
return new MessageDecoderV2();
}
@Override
public ByteToMessageDecoder newDecoder() {
return new MessageDecoderV1();
}
/**
* 获取解码器 @see ByteToMessageDecoder
*
* @return 解码器
*/
ByteToMessageDecoder getDecoder();
/**
* Returns a new message decoder.
*
* @return a new message decoder
*/
ByteToMessageDecoder newDecoder();
public ByteToMessageDecoder newDecodeHandler(UserConnection info, ByteToMessageDecoder minecraftDecoder);
ByteToMessageDecoder getCompressDecoder();