下面列出了 io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender #io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
/*
* 解码和编码,应和服务端一致
* */
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyClientHandler",new NettyClientHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", new NettyServerHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
/*
* 解码和编码,应和服务端一致
* */
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyClientHandler",nettyClientHandler);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", nettyServerHandler);
}
public static Channel connect(ByteToMessageDecoder decoder) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(256 * 1024));
ch.config().setOption(ChannelOption.SO_RCVBUF, 256 * 1024);
ch.config().setOption(ChannelOption.SO_BACKLOG, 1024);
ch.pipeline()
.addLast("readTimeoutHandler", new ReadTimeoutHandler(600, TimeUnit.SECONDS))
.addLast("writeTimeoutHandler", new WriteTimeoutHandler(600, TimeUnit.SECONDS));
ch.pipeline().addLast("protoPender", new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast("lengthDecode", new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast("handshakeHandler", decoder);
ch.closeFuture();
}
}).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT);
return b.connect("127.0.0.1", port).sync().channel();
}
/**
* Init Bootstrap
*/
public static final Bootstrap getBootstrap() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("decoder", new ProtobufDecoder(MessageBuf.JMTransfer.getDefaultInstance()));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast("handler", new TcpClientHandler());
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), NettyPistachioClient.HOST, NettyPistachioClient.PORT));
}
LogLevel level = LogLevel.DEBUG;
p.addLast(new LoggingHandler(level));
p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ClientReadTimeoutMillis",10000), TimeUnit.MILLISECONDS));
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Response.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new NettyPistachioClientHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.Locations.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), WorldClockClient.HOST, WorldClockClient.PORT));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.LocalTimes.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockClientHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS))
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(ClientTransferData.ClientTransferDataProtoc.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new SecondProtobufCodec())
.addLast(new TransferHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(60 * 30, 0, 0, TimeUnit.SECONDS))
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(ServerTransferData.ServerTransferDataProtoc.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new SecondProtobufCodec())
.addLast(new TransferHandler());
}
private void start() throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
while (!shutdown) {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
public void initChannel(NioDatagramChannel ch)
throws Exception {
ch.pipeline().addLast(stats.udp);
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new PacketDecoder());
MessageHandler messageHandler = new MessageHandler(ch, backupManager);
backupManager.setMessageHandler(messageHandler);
ch.pipeline().addLast(messageHandler);
}
});
channel = b.bind(port).sync().channel();
logger.info("Backup server started, bind port {}", port);
channel.closeFuture().sync();
if (shutdown) {
logger.info("Shutdown backup BackupServer");
break;
}
logger.warn("Restart backup server ...");
}
} catch (Exception e) {
logger.error("Start backup server with port {} failed.", port, e);
} finally {
group.shutdownGracefully().sync();
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(prototype,extensionRegistry));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new DefaultIdleListenerHandler<T>(listener));
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ProtobufAdapter adapter = new ProtobufAdapter(config);
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", new TcpServerHandler(config));
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.Locations.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), WorldClockClient.HOST, WorldClockClient.PORT));
}
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(WorldClockProtocol.LocalTimes.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new WorldClockClientHandler());
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("MonitorServerHandler", new MonitorServerHandler());
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast("handler", handler);
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("idleStateHandler",
new IdleStateHandler(idleTimeout, idleTimeout / 2, 0, TimeUnit.MILLISECONDS));
if (hangDetection) {
pipeline.addLast("MonitorClientHandler", new MonitorClientHandler());
}
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast("handler", handler);
}
public void start() throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup(args.getUdpNettyWorkThreadNum());
try {
discoveryExecutor = new DiscoveryExecutor(nodeManager);
discoveryExecutor.start();
while (!shutdown) {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
public void initChannel(NioDatagramChannel ch)
throws Exception {
ch.pipeline().addLast(stats.udp);
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new PacketDecoder());
MessageHandler messageHandler = new MessageHandler(ch, nodeManager);
nodeManager.setMessageSender(messageHandler);
ch.pipeline().addLast(messageHandler);
}
});
channel = b.bind(port).sync().channel();
logger.info("Discovery server started, bind port {}", port);
channel.closeFuture().sync();
if (shutdown) {
logger.info("Shutdown discovery server");
break;
}
logger.warn(" Restart discovery server after 5 sec pause...");
Thread.sleep(5000);
}
} catch (Exception e) {
logger.error("Start discovery server with port {} failed.", port, e);
} finally {
group.shutdownGracefully().sync();
}
}
/**
* ********** Service startup/registration and shutdown/termination **************
*/
@Override
protected void doStart() {
// must start the fiber up early.
fiber = fiberSupplier.getNewFiber(this::failModule);
setupEventChannelSubscription();
fiber.start();
C5Futures.addCallback(getDependedOnModules(),
(ignore) -> {
ChannelInitializer<SocketChannel> initer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecode", new ProtobufVarint32FrameDecoder());
p.addLast("pbufDecode", new ProtostuffDecoder<>(ReplicationWireMessage.getSchema()));
p.addLast("frameEncode", new ProtobufVarint32LengthFieldPrepender());
p.addLast("pbufEncoder", new ProtostuffEncoder<ReplicationWireMessage>());
p.addLast(new MessageHandler());
}
};
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 100)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(initer);
//noinspection RedundantCast
serverBootstrap.bind(port).addListener((ChannelFutureListener)
future -> {
if (future.isSuccess()) {
LOG.info("successfully bound node {} port {} ", nodeId, port);
listenChannel = future.channel();
} else {
LOG.error("Unable to bind! ", future.cause());
failModule(future.cause());
}
});
outgoingBootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(initer);
//noinspection Convert2MethodRef
outgoingRequests.subscribe(fiber, message -> handleOutgoingMessage(message),
// Clean up cancelled requests.
message -> handleCancelledSession(message.getSession())
);
notifyStarted();
},
(Throwable t) -> {
LOG.error("ReplicatorService unable to retrieve modules!", t);
failModule(t);
}, fiber);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
LogLevel level = LogLevel.DEBUG;
p.addLast(new LoggingHandler(level));
p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ServerReadTimeoutMillis",10000), TimeUnit.MILLISECONDS));
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Request.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new NettyPistachioServerHandler(handler));
}
/**
* 连接方法
*
* @param host
* @param port
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public void connect(final String host, final int port, final INettyHandlerListener connectionListener) throws Exception {
Log.i(getClass().getName(), "connect come in!connectState=" + connectState);
if (isConnected() || connectState == CONNECT_PROCESSORING) {
// 連接成功 停止重连
NettyAlarmManager.stopReconnection();
return;
}
Log.i(getClass().getName(), "connect come in!CONNECT_PROCESSORING!");
connectState = CONNECT_PROCESSORING;
mHost = host;
mPort = port;
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("java.net.preferIPv6Addresses", "false");
ChannelFuture channelFuture = null;
group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.TCP_NODELAY, true);
b.remoteAddress(new InetSocketAddress(mHost, mPort));
// 有连接到达时会创建一个channel
final ExtensionRegistry registry = ExtensionRegistry.newInstance();
CommandProtoc.registerAllExtensions(registry);
b.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(CommandProtoc.PushMessage.getDefaultInstance(), registry));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(new PushMessageHandler(connectionManager, mNettyProcessorHandler));
}
});
channelFuture = b.connect().sync();
channelFuture.addListener(new ChannelFutureListener() {
@SuppressWarnings("unchecked")
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
SocketAddress sa = future.channel().remoteAddress();
if(sa!=null){
Log.i(getClass().getName(), "netty server connected success! host:" + sa);
// 連接成功
connectState = CONNECT_SUCCESS;
if (connectionListener != null) {
connectionListener.callback(null);
}
// 启动心跳程序
NettyAlarmManager.startHeart(mContext);
// 連接成功 停止重连
NettyAlarmManager.stopReconnection();
}else{
Log.i(getClass().getName(), "netty server connected failed! host:" + sa);
// 連接失敗
connectState = CONNECT_FAILED;
// 連接 失敗 啟動重連
future.cause().printStackTrace();
future.channel().close();
}
} else {
Log.i(getClass().getName(), "netty server attemp failed! host:" + future.channel().remoteAddress());
// 連接失敗
connectState = CONNECT_FAILED;
// 連接 失敗 啟動重連
future.cause().printStackTrace();
future.channel().close();
// NettyAlarmManager.startReconnection(mContext);
// if (mNetworkCallback != null) {
// mNetworkCallback.connectFailed();
// }
}
}
});
// Wait until the connection is closed.
// channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
Log.i(getClass().getName(), e.getMessage());
connectState = CONNECT_EXCEPTION;
// 连接关闭后启动重连
NettyAlarmManager.startReconnection(mContext);
} finally {
Log.i(getClass().getName(), "connect finally!connectState=" + connectState);
disconnect(channelFuture);
}
}