类 io.netty.handler.codec.LengthFieldBasedFrameDecoder 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.LengthFieldBasedFrameDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。


@Override
protected void initChannel(Channel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();

    // netty提供的自定义长度解码器,解决TCP拆包/粘包问题
    pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));
    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,
            0, 2, 0, 2));

    // 增加protobuf编解码支持
    pipeline.addLast(new ProtobufEncoder());
    pipeline.addLast(new ProtobufDecoder(MessageProtobuf.Msg.getDefaultInstance()));

    // 握手认证消息响应处理handler
    pipeline.addLast(LoginAuthRespHandler.class.getSimpleName(), new LoginAuthRespHandler(imsClient));
    // 心跳消息响应处理handler
    pipeline.addLast(HeartbeatRespHandler.class.getSimpleName(), new HeartbeatRespHandler(imsClient));
    // 接收消息处理handler
    pipeline.addLast(TCPReadHandler.class.getSimpleName(), new TCPReadHandler(imsClient));
}
 

@Override
protected void initChannel(Channel ch) throws Exception {
       ChannelPipeline pipeline = ch.pipeline();
       pipeline.addLast(new IdleStateHandler(0,0,35));
       pipeline.addLast(new IdleStateTrigger());
       //��ư��ĸ�ʽ 1�ֽڹ̶���ͷ  1�ֽڹ�����  1�ֽڣ��ж��Ƿ����topic�ֶΣ� 4�ֽڹ̶������ֶ�   12�ֽڹ̶�topic���DZ��룩  ʣ���ֽ�����
       pipeline.addLast(new LengthFieldBasedFrameDecoder(2048, 3, 4, 0, 0));
       pipeline.addLast(new MessageToPoDecoder());
       //�����֤�Ĵ�����
       //pipeline.addLast("auth",new AuthenticationHandler());
       //���Э�鴦����
       pipeline.addLast( "message-process", new MessageProcessHandler());
       pipeline.addLast(new MessageEncoder());
       //pipeline.addLast("auth",new AuthenticationHandler());
       //pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
}
 

private Bootstrap initClientBootstrap() {
    Bootstrap b = new Bootstrap();
    eventLoopGroup = new NioEventLoopGroup();
    b.group(eventLoopGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                clientHandler = new TokenClientHandler(currentState, disconnectCallback);

                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                pipeline.addLast(new NettyResponseDecoder());
                pipeline.addLast(new LengthFieldPrepender(2));
                pipeline.addLast(new NettyRequestEncoder());
                pipeline.addLast(clientHandler);
            }
        });

    return b;
}
 
源代码4 项目: litchi   文件: NettyRpcClient.java

public NettyRpcClient(Litchi litchi, String nodeType, String nodeId, String host, int port) {
    this(nodeType, nodeId, host, port);

    bootstrap.group(bossGroup).channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(DEFAULT_MAX_FRAME_LEN, 0, 4, 0, 0));
                    ch.pipeline().addLast(new RpcDecoder());
                    ch.pipeline().addLast(new RpcEncoder());
                    ch.pipeline().addLast(new GameEventHandler(litchi));
                    ch.pipeline().addLast(new RpcCallbackHandler(litchi, futureContext));
                    ch.pipeline().addLast(new ResponseHandler(litchi));
                }
            })
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, true);
}
 

@Test
public void testFailSlowTooLongFrameRecovery() throws Exception {
    EmbeddedChannel ch = new EmbeddedChannel(
            new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false));

    for (int i = 0; i < 2; i ++) {
        assertFalse(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
        try {
            assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0 })));
            fail(DecoderException.class.getSimpleName() + " must be raised.");
        } catch (TooLongFrameException e) {
            // Expected
        }

        ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' }));
        ByteBuf buf = ch.readInbound();
        assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
        buf.release();
    }
}
 

@Test
public void testFailFastTooLongFrameRecovery() throws Exception {
    EmbeddedChannel ch = new EmbeddedChannel(
            new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4));

    for (int i = 0; i < 2; i ++) {
        try {
            assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
            fail(DecoderException.class.getSimpleName() + " must be raised.");
        } catch (TooLongFrameException e) {
            // Expected
        }

        ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' }));
        ByteBuf buf = ch.readInbound();
        assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
        buf.release();
    }
}
 
源代码7 项目: Summer   文件: ClientInitializer.java

@Override
protected void initChannel(SocketChannel sc) {
	ChannelPipeline pipeline = sc.pipeline();
	ClientConfig config = clientContext.getConfig();
	if (ServerConst.SERVER_PROTOCOL_STRING_LINE.equals(config.getProtocol())) {
		pipeline.addLast(new StringPasswordLineDecoder(config.getMsgLength(), config.getCharset(), config.getPassword()));
		pipeline.addLast(new StringPasswordLineEncoder(config.getCharset(), config.getPassword()));
	} else if (ServerConst.SERVER_PROTOCOL_LENGTH_FIELD.equals(config.getProtocol())) {
		pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
		pipeline.addLast(new LengthFieldPrepender(4));
		pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
		pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
	} else {
		throw new NotFoundProtocolException(config.getProtocol());
	}
	pipeline.addLast(new ClientStringHandler(clientContext));
}
 
源代码8 项目: Sentinel   文件: NettyTransportClient.java

private Bootstrap initClientBootstrap() {
    Bootstrap b = new Bootstrap();
    eventLoopGroup = new NioEventLoopGroup();
    b.group(eventLoopGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                clientHandler = new TokenClientHandler(currentState, disconnectCallback);

                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                pipeline.addLast(new NettyResponseDecoder());
                pipeline.addLast(new LengthFieldPrepender(2));
                pipeline.addLast(new NettyRequestEncoder());
                pipeline.addLast(clientHandler);
            }
        });

    return b;
}
 
源代码9 项目: krpc   文件: TCPClient.java

/**
 * 初始化Bootstrap
 * 
 * @return
 */
public static final Bootstrap getBootstrap() {
	EventLoopGroup group = new NioEventLoopGroup();
	Bootstrap b = new Bootstrap();
	b.group(group).channel(NioSocketChannel.class);
	b.handler(new ChannelInitializer<Channel>() {
		@Override
		protected void initChannel(Channel ch) throws Exception {
			ChannelPipeline pipeline = ch.pipeline();
			pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
			pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
			pipeline.addLast("decoder", new ByteArrayDecoder());
			pipeline.addLast("encoder", new ByteArrayEncoder());
			pipeline.addLast("handler", new TcpClientHandler());
		}
	});
	return b;
}
 
源代码10 项目: krpc   文件: TcpServer.java

protected static void run() throws Exception {
		ServerBootstrap b = new ServerBootstrap();
		b.group(bossGroup, workerGroup);
		b.channel(NioServerSocketChannel.class);
		b.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
				pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
				pipeline.addLast("decoder", new ByteArrayDecoder());
				pipeline.addLast("encoder", new ByteArrayEncoder());
//				pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
//				pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
				pipeline.addLast(new TcpServerHandler());
			}
		});

		// b.bind(IP, PORT).sync();
		ChannelFuture f = b.bind(PORT).sync(); // (7)

		f.channel().closeFuture().sync();

		System.out.println("TCP服务器已启动");
	}
 

@Override
protected void initChannel(Channel ch) throws Exception {

    ch.pipeline().addLast(new LengthFieldPrepender(4, false));
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
            0, 4, 0, 4));

    ch.pipeline().addLast(new ObjectSerializerEncoder());
    ch.pipeline().addLast(new ObjectSerializerDecoder());


    ch.pipeline().addLast(rpcCmdDecoder);
    ch.pipeline().addLast(new RpcCmdEncoder());
    ch.pipeline().addLast(nettyClientRetryHandler);
    ch.pipeline().addLast(socketManagerInitHandler);
    ch.pipeline().addLast(rpcAnswerHandler);
}
 

@Override
protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new LengthFieldPrepender(4, false));
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));

    ch.pipeline().addLast(new IdleStateHandler(managerProperties.getCheckTime(),
            managerProperties.getCheckTime(), managerProperties.getCheckTime(), TimeUnit.MILLISECONDS));


    ch.pipeline().addLast(new ObjectSerializerEncoder());
    ch.pipeline().addLast(new ObjectSerializerDecoder());


    ch.pipeline().addLast(rpcCmdDecoder);
    ch.pipeline().addLast(new RpcCmdEncoder());
    ch.pipeline().addLast(socketManagerInitHandler);
    ch.pipeline().addLast(rpcAnswerHandler);
}
 
源代码13 项目: push   文件: Client.java

public void run() {
    workerGroup = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(NioSocketChannel.class);
        // b.option(ChannelOption.SO_KEEPALIVE, true);
        b.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                pipeline.addLast("decoder", new MsgPackDecode());
                pipeline.addLast("encoder", new MsgPackEncode());
                pipeline.addLast(ClientConfiguration.clientHandler());
            }
        });
        channel = b.connect(clientProperties.getServerHost(), clientProperties.getServerPort()).sync().channel();
        status = Status.START;
        channel.closeFuture().sync();
    } catch (Exception e) {
        e.printStackTrace();
    }
    status = Status.STOP;
}
 
源代码14 项目: GoPush   文件: NodeServerBootstrap.java

@PostConstruct
public void start() throws Exception {

    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workGroup)
            .channelFactory(NioServerSocketChannel::new)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {

                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                    pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
                    pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                    pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
                    pipeline.addLast("idleStateHandler", new IdleStateHandler(300, 0, 0));
                    pipeline.addLast("handler", nodeChannelInBoundHandler);
                }
            })
            .option(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_SNDBUF, 2048)
            .option(ChannelOption.SO_RCVBUF, 1024);
    bootstrap.bind(goPushNodeServerConfig.getNodePort()).sync();
    log.info("Node server start successful! listening port: {}", goPushNodeServerConfig.getNodePort());
}
 
源代码15 项目: ServerCore   文件: NetworkServiceImpl.java

@Override
protected void initChannel(Channel ch) {
    ChannelPipeline pip = ch.pipeline();
    int maxLength = 1048576;
    int lengthFieldLength = 4;
    int ignoreLength = -4;
    int offset = 0;
    pip.addLast(new LengthFieldBasedFrameDecoder(maxLength, offset, lengthFieldLength, ignoreLength, lengthFieldLength));
    pip.addLast(new MessageDecoder(builder.getImessageandhandler()));
    pip.addLast(new LengthFieldPrepender(4, true));
    pip.addLast(new MessageEncoder(builder.getImessageandhandler()));
    pip.addLast(new MessageExecutor(builder.getConsumer(), builder.getListener()));
    for (ChannelHandler handler : builder.getExtraHandlers()) {
        pip.addLast(handler);
    }
}
 
源代码16 项目: pulsar   文件: ServiceChannelInitializer.java

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    if (sslCtxRefresher != null && this.enableTls) {
        if (this.tlsEnabledWithKeyStore) {
            ch.pipeline().addLast(TLS_HANDLER,
                    new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
        } else{
            SslContext sslContext = sslCtxRefresher.get();
            if (sslContext != null) {
                ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
            }
        }
    }
    ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
        Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
    ch.pipeline().addLast("handler", new ServerConnection(discoveryService));
}
 

@Test
public void testFailFastTooLongFrameRecovery() throws Exception {
    EmbeddedChannel ch = new EmbeddedChannel(
            new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4));

    for (int i = 0; i < 2; i ++) {
        try {
            assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
            fail(DecoderException.class.getSimpleName() + " must be raised.");
        } catch (TooLongFrameException e) {
            // Expected
        }

        ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' }));
        ByteBuf buf = releaseLater((ByteBuf) ch.readInbound());
        assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
        buf.release();
    }
}
 
源代码18 项目: pulsar   文件: MockBrokerService.java

public void startMockBrokerService() throws Exception {
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("mock-pulsar-%s").build();
    final int numThreads = 2;

    final int MaxMessageSize = 5 * 1024 * 1024;

    try {
        workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(workerGroup, workerGroup);
        bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
                ch.pipeline().addLast("handler", new MockServerCnx());
            }
        });
        // Bind and start to accept incoming connections.
        listenChannel = bootstrap.bind(0).sync().channel();
    } catch (Exception e) {
        throw e;
    }
}
 
源代码19 项目: pulsar   文件: PulsarChannelInitializer.java

@Override
 public void initChannel(SocketChannel ch) throws Exception {
     if (tlsEnabled) {
         if (tlsEnabledWithKeyStore) {
             ch.pipeline().addLast(TLS_HANDLER,
                     new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
} else {
	SslHandler handler = StringUtils.isNotBlank(sniHostName)
			? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort)
			: sslContextSupplier.get().newHandler(ch.alloc());
	ch.pipeline().addLast(TLS_HANDLER, handler);
}
         ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
     } else {
         ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
     }

     ch.pipeline()
             .addLast("frameDecoder",
                     new LengthFieldBasedFrameDecoder(
                             Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING,
                             0, 4, 0, 4));
     ch.pipeline().addLast("handler", clientCnxSupplier.get());
 }
 
源代码20 项目: nomulus   文件: EppModule.java

/**
 * {@link Provides} the list of providers of {@link ChannelHandler}s that are used for the EPP
 * Protocol.
 */
@Provides
@EppProtocol
static ImmutableList<Provider<? extends ChannelHandler>> provideEppHandlerProviders(
    @EppProtocol Provider<SslClientInitializer<NioSocketChannel>> sslClientInitializerProvider,
    Provider<LengthFieldBasedFrameDecoder> lengthFieldBasedFrameDecoderProvider,
    Provider<LengthFieldPrepender> lengthFieldPrependerProvider,
    Provider<EppMessageHandler> eppMessageHandlerProvider,
    Provider<EppActionHandler> eppActionHandlerProvider) {
  return ImmutableList.of(
      sslClientInitializerProvider,
      lengthFieldBasedFrameDecoderProvider,
      lengthFieldPrependerProvider,
      eppMessageHandlerProvider,
      eppActionHandlerProvider);
}
 
源代码21 项目: Okra   文件: GpbTcpServer.java

@Override
    protected ChannelHandler newChannelInitializer() {
        return new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline cp = ch.pipeline();
                cp.addLast("frame", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
                cp.addLast("prepender", FRAME_PREPENDER);
                cp.addLast("decoder", GPB_DECODER_HANDLER);
                cp.addLast("encoder", GPB_ENCODER_HANDLER);
                // handler
                cp.addLast("handler", serverHandler);
//                cp.addLast("handler", new ServerHandler());
            }
        };
    }
 

@Test
public void testFailSlowTooLongFrameRecovery() throws Exception {
    EmbeddedChannel ch = new EmbeddedChannel(
            new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false));

    for (int i = 0; i < 2; i ++) {
        assertFalse(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
        try {
            assertTrue(ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0 })));
            fail(DecoderException.class.getSimpleName() + " must be raised.");
        } catch (TooLongFrameException e) {
            // Expected
        }

        ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' }));
        ByteBuf buf = releaseLater((ByteBuf) ch.readInbound());
        assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
        buf.release();
    }
}
 

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    // 读超时控制 - 注意:netty的EventLoop虽然支持定时任务任务,但是定时任务对EventLoop非常不友好,要尽量减少这种定时任务。
    pipeline.addLast(NetUtils.READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(portExtraInfo.getSessionConfig().readTimeout()));
    pipeline.addLast(new LengthFieldBasedFrameDecoder(portExtraInfo.getSessionConfig().maxFrameLength(), 0, 4, 0, 4));
    pipeline.addLast(new ServerSocketCodec(portExtraInfo.getSessionConfig().serializer(), portExtraInfo));
}
 

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    // 读超时控制 - 注意:netty的EventLoop虽然支持定时任务任务,但是定时任务对EventLoop非常不友好,要尽量减少这种定时任务。
    pipeline.addLast(NetUtils.READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(config.readTimeout()));
    pipeline.addLast(new LengthFieldBasedFrameDecoder(config.maxFrameLength(), 0, 4, 0, 4));
    pipeline.addLast(new ClientSocketCodec(config.serializer(), sessionId, netEventLoop));
}
 
源代码25 项目: kop   文件: KafkaChannelInitializer.java

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    if (this.enableTls) {
        ch.pipeline().addLast(TLS_HANDLER, new SslHandler(SSLUtils.createSslEngine(sslContextFactory)));
    }
    ch.pipeline().addLast(new LengthFieldPrepender(4));
    ch.pipeline().addLast("frameDecoder",
        new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
    ch.pipeline().addLast("handler",
        new KafkaRequestHandler(pulsarService, kafkaConfig, groupCoordinator, enableTls));
}
 

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline cp = socketChannel.pipeline();
    cp.addLast(new RpcEncoder(RpcRequest.class));
    cp.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0));
    cp.addLast(new RpcDecoder(RpcResponse.class));
    cp.addLast(new RpcClientHandler());
}
 

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    pipeline.addLast(new RpcEncoder(RpcResponse.class))
            .addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0))
            .addLast(new RpcDecoder(RpcRequest.class))
            .addLast(new RpcServerHandler(this.handleMap));
}
 
源代码28 项目: Okra   文件: BenchmarkClient.java

@Override
protected ChannelHandler newChannelInitializer() {
    return new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ChannelPipeline cp = ch.pipeline();
            cp.addLast("frame", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
            cp.addLast("prepender", FRAME_PREPENDER);
            // Any other useful handler
            cp.addLast("strDecoder", STRING_DECODER);
            cp.addLast("strEncoder", STRING_ENCODER);
            cp.addLast("handler", new SimpleChannelInboundHandler<String>() {
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                    COUNT.getAndIncrement();
                    ChannelPromise voidPromise = ctx.voidPromise();
                    if (ctx.channel().isWritable()) {
                        ctx.writeAndFlush(msg, voidPromise);
                    } else {
                        ctx.channel().eventLoop().schedule(() -> {
                            ctx.writeAndFlush(msg, voidPromise);
                        }, 1L, TimeUnit.SECONDS);
                    }
                }
            });
        }
    };
}
 
源代码29 项目: Summer   文件: ServerInitializer.java

@Override
protected void initChannel(SocketChannel sc) {
	ChannelPipeline pipeline = sc.pipeline();
	ServerConfig config = serverContext.getConfig();
	if (ServerConst.SERVER_PROTOCOL_STRING_LINE.equals(config.getProtocol())) {
		pipeline.addLast(new StringPasswordLineDecoder(config.getMsgLength(), config.getCharset(), config.getPassword()));
		pipeline.addLast(new StringPasswordLineEncoder(config.getCharset(), config.getPassword()));
		pipeline.addLast(new ServerStringHandler(serverContext));
	} else if (ServerConst.SERVER_PROTOCOL_WEB_SOCKET.equals(config.getProtocol())) {
		pipeline.addLast(new HttpServerCodec());
		pipeline.addLast(new HttpObjectAggregator(config.getMsgLength()));
		pipeline.addLast(new ChunkedWriteHandler());
		pipeline.addLast(new WebSocketUriFilter(serverContext));
		pipeline.addLast(new WebSocketServerProtocolHandler("/" + config.getServerName()));
		pipeline.addLast(new WebSocketDecoder());
		pipeline.addLast(new WebSocketEncoder());
		pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
		pipeline.addLast(new LengthFieldPrepender(4));
		pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
		pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
		pipeline.addLast(new ServerStringHandler(serverContext));
	} else if (ServerConst.SERVER_PROTOCOL_LENGTH_FIELD.equals(config.getProtocol())) {
		pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
		pipeline.addLast(new LengthFieldPrepender(4));
		pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
		pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
		pipeline.addLast(new ServerStringHandler(serverContext));
	} else if (ServerConst.SERVER_PROTOCOL_HTTP.equals(config.getProtocol())) {
		pipeline.addLast(new HttpServerCodec());
		pipeline.addLast(new HttpObjectAggregator(config.getMsgLength()));
		pipeline.addLast(new ChunkedWriteHandler());
		pipeline.addLast(new WebRequestHandler(serverContext));
	} else {
		throw new NotFoundProtocolException(config.getProtocol());
	}
}
 

@Override
public void buildChannelPipeline(ChannelPipeline pipeline) {
    pipeline.addLast("lengthEncode", new LengthFieldPrepender(4, false));
    pipeline.addLast("lengthDecoder", new LengthFieldBasedFrameDecoder(2000, 0, 4, 0, 4));
    pipeline.addLast(MsgOutboundHandler.INSTANCE);
    if(socketConfiguration.getProtocolType() == SocketConfiguration.ProtocolType.TEXT){
        pipeline.addLast(DefaultTextTcpSocketServerHandler.INSTANCE);
    }else{
        pipeline.addLast(ProtocolTcpSocketServerHandler.INSTANCE);
    }
}
 
 类所在包
 类方法
 同包方法