类 io.netty.handler.codec.protobuf.ProtobufEncoder 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.protobuf.ProtobufEncoder 的API类实例代码及写法,或者点击链接到github查看源代码。


@Override
protected void initChannel(Channel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();

    // netty提供的自定义长度解码器,解决TCP拆包/粘包问题
    pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));
    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,
            0, 2, 0, 2));

    // 增加protobuf编解码支持
    pipeline.addLast(new ProtobufEncoder());
    pipeline.addLast(new ProtobufDecoder(MessageProtobuf.Msg.getDefaultInstance()));

    // 握手认证消息响应处理handler
    pipeline.addLast(LoginAuthRespHandler.class.getSimpleName(), new LoginAuthRespHandler(imsClient));
    // 心跳消息响应处理handler
    pipeline.addLast(HeartbeatRespHandler.class.getSimpleName(), new HeartbeatRespHandler(imsClient));
    // 接收消息处理handler
    pipeline.addLast(TCPReadHandler.class.getSimpleName(), new TCPReadHandler(imsClient));
}
 

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline ph = ch.pipeline();
    /*
     * 解码和编码,应和服务端一致
     * */
    //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
    ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

    //传输的协议 Protobuf
    ph.addLast(new ProtobufVarint32FrameDecoder());
    ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
    ph.addLast(new ProtobufVarint32LengthFieldPrepender());
    ph.addLast(new ProtobufEncoder());

    //业务逻辑实现类
    ph.addLast("nettyClientHandler",new NettyClientHandler());

}
 

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline ph = ch.pipeline();

    //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
    ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
    // 解码和编码,应和客户端一致
    //传输的协议 Protobuf
    ph.addLast(new ProtobufVarint32FrameDecoder());
    ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
    ph.addLast(new ProtobufVarint32LengthFieldPrepender());
    ph.addLast(new ProtobufEncoder());

    //业务逻辑实现类
    ph.addLast("nettyServerHandler", new NettyServerHandler());
}
 

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline ph = ch.pipeline();
    /*
     * 解码和编码,应和服务端一致
     * */
    //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
    ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));  
    
    //传输的协议 Protobuf
    ph.addLast(new ProtobufVarint32FrameDecoder());
    ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
    ph.addLast(new ProtobufVarint32LengthFieldPrepender());
    ph.addLast(new ProtobufEncoder());
   
    //业务逻辑实现类
    ph.addLast("nettyClientHandler",nettyClientHandler);
  
}
 

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline ph = ch.pipeline();
 
    //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
    ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
    // 解码和编码,应和客户端一致
    //传输的协议 Protobuf
    ph.addLast(new ProtobufVarint32FrameDecoder());
    ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
    ph.addLast(new ProtobufVarint32LengthFieldPrepender());
    ph.addLast(new ProtobufEncoder());
    
    //业务逻辑实现类
    ph.addLast("nettyServerHandler", nettyServerHandler);
}
 
源代码6 项目: tcp-gateway   文件: TcpClient.java

/**
 * Init Bootstrap
 */
public static final Bootstrap getBootstrap() {
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap b = new Bootstrap();
    b.group(group);
    b.channel(NioSocketChannel.class);
    b.handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
            pipeline.addLast("decoder", new ProtobufDecoder(MessageBuf.JMTransfer.getDefaultInstance()));
            pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast("encoder", new ProtobufEncoder());
            pipeline.addLast("handler", new TcpClientHandler());
        }
    });


    b.option(ChannelOption.SO_KEEPALIVE, true);
    return b;
}
 

@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
        p.addLast(sslCtx.newHandler(ch.alloc(), NettyPistachioClient.HOST, NettyPistachioClient.PORT));
    }

    LogLevel level = LogLevel.DEBUG;


    p.addLast(new LoggingHandler(level));
    p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ClientReadTimeoutMillis",10000), TimeUnit.MILLISECONDS));
    p.addLast(new ProtobufVarint32FrameDecoder());
    p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Response.getDefaultInstance()));

    p.addLast(new ProtobufVarint32LengthFieldPrepender());
    p.addLast(new ProtobufEncoder());

    p.addLast(new NettyPistachioClientHandler());
}
 

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
        p.addLast(sslCtx.newHandler(ch.alloc()));
    }

    p.addLast(new ProtobufVarint32FrameDecoder());
    p.addLast(new ProtobufDecoder(WorldClockProtocol.Locations.getDefaultInstance()));

    p.addLast(new ProtobufVarint32LengthFieldPrepender());
    p.addLast(new ProtobufEncoder());

    p.addLast(new WorldClockServerHandler());
}
 

@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
        p.addLast(sslCtx.newHandler(ch.alloc(), WorldClockClient.HOST, WorldClockClient.PORT));
    }

    p.addLast(new ProtobufVarint32FrameDecoder());
    p.addLast(new ProtobufDecoder(WorldClockProtocol.LocalTimes.getDefaultInstance()));

    p.addLast(new ProtobufVarint32LengthFieldPrepender());
    p.addLast(new ProtobufEncoder());

    p.addLast(new WorldClockClientHandler());
}
 
源代码10 项目: ratel   文件: DefaultChannelInitializer.java

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	
	ch.pipeline()
	.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS))
       .addLast(new ProtobufVarint32FrameDecoder())
       .addLast(new ProtobufDecoder(ClientTransferData.ClientTransferDataProtoc.getDefaultInstance()))
       .addLast(new ProtobufVarint32LengthFieldPrepender())
       .addLast(new ProtobufEncoder())
       .addLast(new SecondProtobufCodec())
       .addLast(new TransferHandler());
	
}
 
源代码11 项目: ratel   文件: DefaultChannelInitializer.java

@Override
protected void initChannel(SocketChannel ch) throws Exception {

	ch.pipeline()
	.addLast(new IdleStateHandler(60 * 30, 0, 0, TimeUnit.SECONDS))
       .addLast(new ProtobufVarint32FrameDecoder())
       .addLast(new ProtobufDecoder(ServerTransferData.ServerTransferDataProtoc.getDefaultInstance()))
       .addLast(new ProtobufVarint32LengthFieldPrepender())
       .addLast(new ProtobufEncoder())
       .addLast(new SecondProtobufCodec())
       .addLast(new TransferHandler());
	
}
 
源代码12 项目: TakinRPC   文件: ClientInitializer.java

@Override
protected void initChannel(T ch) throws Exception {
    ChannelPipeline p = ch.pipeline();

    p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    p.addLast("protobufDecoder", new ProtobufDecoder(NettyRpcProto.RpcContainer.getDefaultInstance()));

    p.addLast("frameEncoder", new LengthFieldPrepender(4));
    p.addLast("protobufEncoder", new ProtobufEncoder());

    ConcurrentHashMap<Integer, RpcCall> callMap = new ConcurrentHashMap<Integer, RpcCall>();
    p.addLast(eventExecutor, "inboundHandler", new InboundHandler(callMap));
    p.addLast("outboundHandler", new OutboundHandler(callMap));

}
 
源代码13 项目: TakinRPC   文件: ServerInitializer.java

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();

    p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    p.addLast("protobufDecoder", new ProtobufDecoder(NettyRpcProto.RpcContainer.getDefaultInstance()));

    p.addLast("frameEncoder", new LengthFieldPrepender(4));
    p.addLast("protobufEncoder", new ProtobufEncoder());

    p.addLast(eventExecutor, "serverHandler", handler);
}
 
源代码14 项目: util4j   文件: InitalizerProtobufHandler.java

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	ChannelPipeline p = ch.pipeline();
	p.addLast(new ProtobufVarint32FrameDecoder());
	p.addLast(new ProtobufDecoder(prototype,extensionRegistry));
	p.addLast(new ProtobufVarint32LengthFieldPrepender());
	p.addLast(new ProtobufEncoder());
	p.addLast(new DefaultIdleListenerHandler<T>(listener));
}
 

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
        p.addLast(sslCtx.newHandler(ch.alloc()));
    }

    p.addLast(new ProtobufVarint32FrameDecoder());
    p.addLast(new ProtobufDecoder(WorldClockProtocol.Locations.getDefaultInstance()));

    p.addLast(new ProtobufVarint32LengthFieldPrepender());
    p.addLast(new ProtobufEncoder());

    p.addLast(new WorldClockServerHandler());
}
 

@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
        p.addLast(sslCtx.newHandler(ch.alloc(), WorldClockClient.HOST, WorldClockClient.PORT));
    }

    p.addLast(new ProtobufVarint32FrameDecoder());
    p.addLast(new ProtobufDecoder(WorldClockProtocol.LocalTimes.getDefaultInstance()));

    p.addLast(new ProtobufVarint32LengthFieldPrepender());
    p.addLast(new ProtobufEncoder());

    p.addLast(new WorldClockClientHandler());
}
 

@Override
protected void initChannel(Channel channel) throws Exception {
  ChannelPipeline pipeline = channel.pipeline();
  pipeline.addLast("MonitorServerHandler", new MonitorServerHandler());
  pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
  pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
  pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
  pipeline.addLast("protobufEncoder", new ProtobufEncoder());
  pipeline.addLast("handler", handler);
}
 

@Override
protected void initChannel(Channel channel) throws Exception {
  ChannelPipeline pipeline = channel.pipeline();
  pipeline.addLast("idleStateHandler",
      new IdleStateHandler(idleTimeout, idleTimeout / 2, 0, TimeUnit.MILLISECONDS));

  if (hangDetection) {
    pipeline.addLast("MonitorClientHandler", new MonitorClientHandler());
  }
  pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
  pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
  pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
  pipeline.addLast("protobufEncoder", new ProtobufEncoder());
  pipeline.addLast("handler", handler);
}
 
源代码19 项目: spliceengine   文件: OlapPipelineFactory.java

@Override
protected void initChannel(Channel channel) throws Exception {
    SpliceLogUtils.trace(LOG, "Creating new channel pipeline...");
    ChannelPipeline pipeline = channel.pipeline();
    pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1<<30,0,4,0,4)); //max frame size is 1GB=2^30 bytes
    pipeline.addLast("protobufDecoder",decoder);
    pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));
    pipeline.addLast("protobufEncoder",new ProtobufEncoder());
    pipeline.addLast("statusHandler", statusHandler);
    pipeline.addLast("submitHandler", submitHandler);
    pipeline.addLast("cancelHandler",cancelHandler);
    SpliceLogUtils.trace(LOG, "Done creating channel pipeline");
}
 
源代码20 项目: NettyChat   文件: NettyServerDemo.java

public static void main(String[] args) {

        //boss线程监听端口,worker线程负责数据读写
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            //辅助启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置线程池
            bootstrap.group(boss, worker);

            //设置socket工厂
            bootstrap.channel(NioServerSocketChannel.class);

            //设置管道工厂
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));
                    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,
                            0, 2, 0, 2));
                    pipeline.addLast(new ProtobufDecoder(MessageProtobuf.Msg.getDefaultInstance()));
                    pipeline.addLast(new ProtobufEncoder());
                    //处理类
                    pipeline.addLast(new ServerHandler());
                }
            });

            //设置TCP参数
            //1.链接缓冲池的大小(ServerSocketChannel的设置)
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //维持链接的活跃,清除死链接(SocketChannel的设置)
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            //关闭延迟发送
            bootstrap.childOption(ChannelOption.TCP_NODELAY, true);

            //绑定端口
            ChannelFuture future = bootstrap.bind(8855).sync();
            System.out.println("server start ...... ");

            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //优雅退出,释放线程池资源
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
 
源代码21 项目: tcp-gateway   文件: ProtobufAdapter.java

public ProtobufEncoder getEncoder() {
    return encoder;
}
 

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
        p.addLast(sslCtx.newHandler(ch.alloc()));
    }

    LogLevel level = LogLevel.DEBUG;



    p.addLast(new LoggingHandler(level));

    p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ServerReadTimeoutMillis",10000), TimeUnit.MILLISECONDS));
    p.addLast(new ProtobufVarint32FrameDecoder());
    p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Request.getDefaultInstance()));

    p.addLast(new ProtobufVarint32LengthFieldPrepender());
    p.addLast(new ProtobufEncoder());

    p.addLast(new NettyPistachioServerHandler(handler));
}
 
源代码23 项目: anetty_client   文件: NettyServerManager.java

/**
 * 连接方法
 * 
 * @param host
 * @param port
 * @throws Exception
 */
@SuppressWarnings("rawtypes")
public void connect(final String host, final int port, final INettyHandlerListener connectionListener) throws Exception {
	Log.i(getClass().getName(), "connect come in!connectState=" + connectState);
	if (isConnected() || connectState == CONNECT_PROCESSORING) {
		// 連接成功 停止重连
		NettyAlarmManager.stopReconnection();
		return;
	}
	Log.i(getClass().getName(), "connect come in!CONNECT_PROCESSORING!");
	connectState = CONNECT_PROCESSORING;
	mHost = host;
	mPort = port;
	System.setProperty("java.net.preferIPv4Stack", "true");
	System.setProperty("java.net.preferIPv6Addresses", "false");
	ChannelFuture channelFuture = null;
	group = new NioEventLoopGroup();
	try {
		Bootstrap b = new Bootstrap();
		b.group(group);
		b.channel(NioSocketChannel.class);
		b.option(ChannelOption.SO_KEEPALIVE, true);
		b.option(ChannelOption.TCP_NODELAY, true);
		b.remoteAddress(new InetSocketAddress(mHost, mPort));
		// 有连接到达时会创建一个channel
		final ExtensionRegistry registry = ExtensionRegistry.newInstance();
		CommandProtoc.registerAllExtensions(registry);
		b.handler(new ChannelInitializer<SocketChannel>() {
			public void initChannel(SocketChannel ch) throws Exception {					
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
				pipeline.addLast("protobufDecoder", new ProtobufDecoder(CommandProtoc.PushMessage.getDefaultInstance(), registry));
				pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
				pipeline.addLast("protobufEncoder", new ProtobufEncoder());
				pipeline.addLast(new PushMessageHandler(connectionManager, mNettyProcessorHandler));
			}
		});
		channelFuture = b.connect().sync();
		channelFuture.addListener(new ChannelFutureListener() {
			@SuppressWarnings("unchecked")
			public void operationComplete(ChannelFuture future) throws Exception {
				if (future.isSuccess()) {
					SocketAddress sa = future.channel().remoteAddress();								
					if(sa!=null){
						Log.i(getClass().getName(), "netty server connected success! host:" + sa);				
						// 連接成功
						connectState = CONNECT_SUCCESS;
						if (connectionListener != null) {
							connectionListener.callback(null);
						}
						// 启动心跳程序
						NettyAlarmManager.startHeart(mContext);
						// 連接成功 停止重连
						NettyAlarmManager.stopReconnection();
					}else{
						Log.i(getClass().getName(), "netty server connected failed! host:" + sa);
						// 連接失敗
						connectState = CONNECT_FAILED;
						// 連接 失敗 啟動重連
						future.cause().printStackTrace();
						future.channel().close();
					}						
				} else {
					Log.i(getClass().getName(), "netty server attemp failed! host:" + future.channel().remoteAddress());
					// 連接失敗
					connectState = CONNECT_FAILED;
					// 連接 失敗 啟動重連
					future.cause().printStackTrace();
					future.channel().close();
					// NettyAlarmManager.startReconnection(mContext);
					// if (mNetworkCallback != null) {
					// mNetworkCallback.connectFailed();
					// }
				}
			}
		});
		// Wait until the connection is closed.
		// channelFuture.channel().closeFuture().sync();
	} catch (Exception e) {
		Log.i(getClass().getName(), e.getMessage());
		connectState = CONNECT_EXCEPTION;
		// 连接关闭后启动重连
		NettyAlarmManager.startReconnection(mContext);
	} finally {
		Log.i(getClass().getName(), "connect finally!connectState=" + connectState);
		disconnect(channelFuture);
	}
}
 
 类所在包
 同包方法