下面列出了 io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder #io.netty.handler.codec.protobuf.ProtobufEncoder 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// netty提供的自定义长度解码器,解决TCP拆包/粘包问题
pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,
0, 2, 0, 2));
// 增加protobuf编解码支持
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(MessageProtobuf.Msg.getDefaultInstance()));
// 握手认证消息响应处理handler
pipeline.addLast(LoginAuthRespHandler.class.getSimpleName(), new LoginAuthRespHandler(imsClient));
// 心跳消息响应处理handler
pipeline.addLast(HeartbeatRespHandler.class.getSimpleName(), new HeartbeatRespHandler(imsClient));
// 接收消息处理handler
pipeline.addLast(TCPReadHandler.class.getSimpleName(), new TCPReadHandler(imsClient));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
/*
* 解码和编码,应和服务端一致
* */
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyClientHandler",new NettyClientHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", new NettyServerHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
/*
* 解码和编码,应和服务端一致
* */
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyClientHandler",nettyClientHandler);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", nettyServerHandler);
}
/**
* Init Bootstrap
*/
public static final Bootstrap getBootstrap() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("decoder", new ProtobufDecoder(MessageBuf.JMTransfer.getDefaultInstance()));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast("handler", new TcpClientHandler());
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), NettyPistachioClient.HOST, NettyPistachioClient.PORT));
}
LogLevel level = LogLevel.DEBUG;
p.addLast(new LoggingHandler(level));
p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ClientReadTimeoutMillis",10000), TimeUnit.MILLISECONDS));
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Response.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new NettyPistachioClientHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.Locations.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), WorldClockClient.HOST, WorldClockClient.PORT));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.LocalTimes.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockClientHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS))
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(ClientTransferData.ClientTransferDataProtoc.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new SecondProtobufCodec())
.addLast(new TransferHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(60 * 30, 0, 0, TimeUnit.SECONDS))
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(ServerTransferData.ServerTransferDataProtoc.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new SecondProtobufCodec())
.addLast(new TransferHandler());
}
@Override
protected void initChannel(T ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
p.addLast("protobufDecoder", new ProtobufDecoder(NettyRpcProto.RpcContainer.getDefaultInstance()));
p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("protobufEncoder", new ProtobufEncoder());
ConcurrentHashMap<Integer, RpcCall> callMap = new ConcurrentHashMap<Integer, RpcCall>();
p.addLast(eventExecutor, "inboundHandler", new InboundHandler(callMap));
p.addLast("outboundHandler", new OutboundHandler(callMap));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
p.addLast("protobufDecoder", new ProtobufDecoder(NettyRpcProto.RpcContainer.getDefaultInstance()));
p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("protobufEncoder", new ProtobufEncoder());
p.addLast(eventExecutor, "serverHandler", handler);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(prototype,extensionRegistry));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new DefaultIdleListenerHandler<T>(listener));
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.Locations.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), WorldClockClient.HOST, WorldClockClient.PORT));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.LocalTimes.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockClientHandler());
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("MonitorServerHandler", new MonitorServerHandler());
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast("handler", handler);
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("idleStateHandler",
new IdleStateHandler(idleTimeout, idleTimeout / 2, 0, TimeUnit.MILLISECONDS));
if (hangDetection) {
pipeline.addLast("MonitorClientHandler", new MonitorClientHandler());
}
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast("handler", handler);
}
@Override
protected void initChannel(Channel channel) throws Exception {
SpliceLogUtils.trace(LOG, "Creating new channel pipeline...");
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1<<30,0,4,0,4)); //max frame size is 1GB=2^30 bytes
pipeline.addLast("protobufDecoder",decoder);
pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));
pipeline.addLast("protobufEncoder",new ProtobufEncoder());
pipeline.addLast("statusHandler", statusHandler);
pipeline.addLast("submitHandler", submitHandler);
pipeline.addLast("cancelHandler",cancelHandler);
SpliceLogUtils.trace(LOG, "Done creating channel pipeline");
}
public static void main(String[] args) {
//boss线程监听端口,worker线程负责数据读写
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
//辅助启动类
ServerBootstrap bootstrap = new ServerBootstrap();
//设置线程池
bootstrap.group(boss, worker);
//设置socket工厂
bootstrap.channel(NioServerSocketChannel.class);
//设置管道工厂
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,
0, 2, 0, 2));
pipeline.addLast(new ProtobufDecoder(MessageProtobuf.Msg.getDefaultInstance()));
pipeline.addLast(new ProtobufEncoder());
//处理类
pipeline.addLast(new ServerHandler());
}
});
//设置TCP参数
//1.链接缓冲池的大小(ServerSocketChannel的设置)
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//维持链接的活跃,清除死链接(SocketChannel的设置)
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//关闭延迟发送
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
//绑定端口
ChannelFuture future = bootstrap.bind(8855).sync();
System.out.println("server start ...... ");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅退出,释放线程池资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public ProtobufEncoder getEncoder() {
return encoder;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
LogLevel level = LogLevel.DEBUG;
p.addLast(new LoggingHandler(level));
p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ServerReadTimeoutMillis",10000), TimeUnit.MILLISECONDS));
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Request.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new NettyPistachioServerHandler(handler));
}
/**
* 连接方法
*
* @param host
* @param port
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public void connect(final String host, final int port, final INettyHandlerListener connectionListener) throws Exception {
Log.i(getClass().getName(), "connect come in!connectState=" + connectState);
if (isConnected() || connectState == CONNECT_PROCESSORING) {
// 連接成功 停止重连
NettyAlarmManager.stopReconnection();
return;
}
Log.i(getClass().getName(), "connect come in!CONNECT_PROCESSORING!");
connectState = CONNECT_PROCESSORING;
mHost = host;
mPort = port;
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("java.net.preferIPv6Addresses", "false");
ChannelFuture channelFuture = null;
group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.remoteAddress(new InetSocketAddress(mHost, mPort));
// 有连接到达时会创建一个channel
final ExtensionRegistry registry = ExtensionRegistry.newInstance();
CommandProtoc.registerAllExtensions(registry);
b.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(CommandProtoc.PushMessage.getDefaultInstance(), registry));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(new PushMessageHandler(connectionManager, mNettyProcessorHandler));
}
});
channelFuture = b.connect().sync();
channelFuture.addListener(new ChannelFutureListener() {
@SuppressWarnings("unchecked")
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
SocketAddress sa = future.channel().remoteAddress();
if(sa!=null){
Log.i(getClass().getName(), "netty server connected success! host:" + sa);
// 連接成功
connectState = CONNECT_SUCCESS;
if (connectionListener != null) {
connectionListener.callback(null);
}
// 启动心跳程序
NettyAlarmManager.startHeart(mContext);
// 連接成功 停止重连
NettyAlarmManager.stopReconnection();
}else{
Log.i(getClass().getName(), "netty server connected failed! host:" + sa);
// 連接失敗
connectState = CONNECT_FAILED;
// 連接 失敗 啟動重連
future.cause().printStackTrace();
future.channel().close();
}
} else {
Log.i(getClass().getName(), "netty server attemp failed! host:" + future.channel().remoteAddress());
// 連接失敗
connectState = CONNECT_FAILED;
// 連接 失敗 啟動重連
future.cause().printStackTrace();
future.channel().close();
// NettyAlarmManager.startReconnection(mContext);
// if (mNetworkCallback != null) {
// mNetworkCallback.connectFailed();
// }
}
}
});
// Wait until the connection is closed.
// channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
Log.i(getClass().getName(), e.getMessage());
connectState = CONNECT_EXCEPTION;
// 连接关闭后启动重连
NettyAlarmManager.startReconnection(mContext);
} finally {
Log.i(getClass().getName(), "connect finally!connectState=" + connectState);
disconnect(channelFuture);
}
}