下面列出了io.netty.channel.ChannelDuplexHandler#io.netty.handler.timeout.IdleStateHandler 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new IdleStateHandler(0, 0, idleTimeInSeconds));
if(sslContext != null){
p.addLast(sslContext.newHandler(ch.alloc()));
}
CodecInitializer initializer = getCodecInitializer();
if(initializer != null){
List<ChannelHandler> handlers = new ArrayList<>();
initializer.initPipeline(handlers);
for(ChannelHandler handler : handlers){
p.addLast(handler);
}
}
p.addLast(this.nettyToIoAdaptor);
}
private void initializePlainTcpTransport(IMessaging messaging, Properties props)
throws IOException {
final NettyMQTTHandler mqttHandler = new NettyMQTTHandler();
final PubsubHandler handler = new PubsubHandler(pubsub, mqttHandler);
handler.setMessaging(messaging);
String host = props.getProperty(Constants.HOST_PROPERTY_NAME);
int port = Integer.parseInt(props.getProperty(Constants.PORT_PROPERTY_NAME));
initFactory(host, port, new PipelineInitializer() {
@Override
void init(ChannelPipeline pipeline) {
pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0,
Constants.DEFAULT_CONNECT_TIMEOUT));
pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimeoutHandler());
//pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(bytesMetricsCollector));
pipeline.addLast("decoder", new MQTTDecoder());
pipeline.addLast("encoder", new MQTTEncoder());
pipeline.addLast("metrics", new MessageMetricsHandler(metricsCollector));
pipeline.addLast("handler", handler);
}
});
}
private Channel tcpServer(int port) throws Exception {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addFirst("idle", new IdleStateHandler(
0,
0,
weEventConfig.getKeepAlive()));
//channelPipeline.addLast("ssl", getSslHandler(sslContext, socketChannel.alloc()));
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("broker", new TcpHandler(protocolProcess));
}
});
return serverBootstrap.bind(port).sync().channel();
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new WebSocketServerProtocolHandler(WEB_SOCKET_PATH, null, true));
pipeline.addLast(new WebSocketHandler(litchi));
for (ChannelHandler handler : handlers) {
pipeline.addLast(handler);
}
}
protected ChannelPipeline getDefaulHttpChannelPipeline(Channel channel) throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = channel.pipeline();
SslHandler sslHandler = configureServerSSLOnDemand();
if (sslHandler != null) {
LOG.log(Level.FINE,
"Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}",
sslHandler);
pipeline.addLast("ssl", sslHandler);
}
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxChunkContentSize));
// Remove the following line if you don't want automatic content
// compression.
pipeline.addLast("deflater", new HttpContentCompressor());
// Set up the idle handler
pipeline.addLast("idle", new IdleStateHandler(nettyHttpServerEngine.getReadIdleTime(),
nettyHttpServerEngine.getWriteIdleTime(), 0));
return pipeline;
}
@Override
protected void addSpecialHandlers(@NotNull final Channel ch) throws SslException {
final int handshakeTimeout = tlsTcpListener.getTls().getHandshakeTimeout();
final IdleStateHandler idleStateHandler = new IdleStateHandler(handshakeTimeout, 0, 0, TimeUnit.MILLISECONDS);
final NoTlsHandshakeIdleHandler noTlsHandshakeIdleHandler = new NoTlsHandshakeIdleHandler(eventLog);
if (handshakeTimeout > 0) {
ch.pipeline().addLast(NEW_CONNECTION_IDLE_HANDLER, idleStateHandler);
ch.pipeline().addLast(NO_TLS_HANDSHAKE_IDLE_EVENT_HANDLER, noTlsHandshakeIdleHandler);
}
final Tls tls = tlsTcpListener.getTls();
final SslHandler sslHandler = sslFactory.getSslHandler(ch, tls);
sslHandler.handshakeFuture().addListener(future -> {
if (handshakeTimeout > 0) {
ch.pipeline().remove(idleStateHandler);
ch.pipeline().remove(noTlsHandshakeIdleHandler);
}
addNoConnectIdleHandlerAfterTlsHandshake(ch);
});
new SslInitializer(sslHandler, tls, eventLog, sslParameterHandler).addHandlers(ch);
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4, false));
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new IdleStateHandler(managerProperties.getCheckTime(),
managerProperties.getCheckTime(), managerProperties.getCheckTime(), TimeUnit.MILLISECONDS));
ch.pipeline().addLast(new ObjectSerializerEncoder());
ch.pipeline().addLast(new ObjectSerializerDecoder());
ch.pipeline().addLast(rpcCmdDecoder);
ch.pipeline().addLast(new RpcCmdEncoder());
ch.pipeline().addLast(socketManagerInitHandler);
ch.pipeline().addLast(rpcAnswerHandler);
}
@Override
protected void initChannel( SocketChannel _channel ) throws Exception {
ChannelPipeline pipeline = _channel.pipeline();
// 是否使用客户端模式
if( CommonConfig.getBoolean( "jees.jsts.websocket.ssl.enable", false ) ){
SSLEngine engine = sslContext1.createSSLEngine();
// 是否需要验证客户端
engine.setUseClientMode(false);
// engine.setNeedClientAuth(false);
pipeline.addFirst("ssl", new SslHandler( engine ));
}
pipeline.addLast( new IdleStateHandler( 100 , 0 , 0 , TimeUnit.SECONDS ) );
pipeline.addLast( new HttpServerCodec() );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new HttpObjectAggregator( 8192 ) );
pipeline.addLast( new WebSocketServerProtocolHandler( CommonConfig.getString( ISocketBase.Netty_WebSocket_Url, "/" ) ) );
pipeline.addLast( CommonContextHolder.getBean( WebSocketHandler.class ) );
}
@Test
public void test_no_connect_idle_handler_default() throws Exception {
final IdleStateHandler[] idleStateHandler = new IdleStateHandler[1];
when(pipeline.addAfter(anyString(), anyString(), any(ChannelHandler.class))).thenAnswer(
new Answer<ChannelPipeline>() {
@Override
public ChannelPipeline answer(final InvocationOnMock invocation) throws Throwable {
if (invocation.getArguments()[1].equals(NEW_CONNECTION_IDLE_HANDLER)) {
idleStateHandler[0] = (IdleStateHandler) (invocation.getArguments()[2]);
}
return pipeline;
}
});
abstractChannelInitializer.initChannel(socketChannel);
assertEquals(500, idleStateHandler[0].getReaderIdleTimeInMillis());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// p.addLast("log", new LoggingHandler(LogLevel.ERROR));
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast("http compressor", new HttpContentCompressor());
// p.addLast(new HttpObjectAggregator(1048576));
p.addLast("http dispatcher", reqDis);
p.addLast("idleStateHandler", new IdleStateHandler(30, 10, 0));
p.addLast("heartbeatHandler", new HeartbeatHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", new NettyServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//添加闲置处理,60秒没有数据传输,触发事件
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
//将字节解码为HttpMessage对象,并将HttpMessage对象编码为字节
pipeline.addLast(new HttpServerCodec());
//出站数据压缩
pipeline.addLast(new HttpContentCompressor());
//聚合多个HttpMessage为单个FullHttpRequest
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//如果被请求的端点是/ws,则处理该升级握手
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//聊天消息处理
pipeline.addLast(new ChatServerHandler());
//心跳处理
pipeline.addLast(new HeartbeatHandler());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
encoder = new SocketEncoder(cipher);
decoder = new SocketDecoder(cipher);
channel.pipeline().addBefore("ClientSocket", "AliveAck", new IdleStateHandler(20, 15, 0));
channel.pipeline().addBefore("ClientSocket", "SocketEncoder", encoder);
channel.pipeline().addBefore("ClientSocket", "SocketDecoder", decoder);
OutPacket packet = new OutPacket(Integer.MAX_VALUE);
packet.encodeShort(14);
packet.encodeShort(OrionConfig.CLIENT_VER);
packet.encodeString(OrionConfig.CLIENT_PATCH);
packet.encodeInt(cipher.getSeqRcv());
packet.encodeInt(cipher.getSeqSnd());
packet.encodeByte(OrionConfig.GAME_LOCALE);
acceptTime = System.currentTimeMillis();
channel.writeAndFlush(Unpooled.wrappedBuffer(packet.toArray()));
super.channelActive(ctx);
}
private Bootstrap createBootstrap() {
return new Bootstrap().group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
//*空闲状态的handler
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
//管理连接的
new NettyConnetManageHandler(NettyClient.this),
//处理具体业务逻辑的handler
new NettyClientHandler(NettyClient.this));
}
});
}
protected Bootstrap initBootstrap() {
AbstractByteBufAllocator allocator =
SoaSystemEnvProperties.SOA_POOLED_BYTEBUF ?
PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, allocator);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds),
new SoaFrameDecoder(), //粘包和断包处理
new SoaIdleHandler(),
new SoaClientHandler(callBack));
}
});
return bootstrap;
}
private void initChannel(ChannelPipeline pipeline) {
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
// max message size not set, so the default from Netty MQTT codec is used
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
// adding the idle state handler for timeout on CONNECT packet
pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
// as MQTT 3.1.1 describes, if no packet is sent after a "reasonable" time (here CONNECT timeout)
// the connection is closed
ctx.channel().close();
}
}
}
});
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS))
.addLast(new HeartHandle())
;
}
@Override
public synchronized void start() {
bossGroup = new NioEventLoopGroup(); // (1)
workerGroup = new NioEventLoopGroup();
try {
b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ByteArrayDecoder());
ch.pipeline().addLast(new ByteArrayEncoder());
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new IdleStateHandler(heartTime, heartTime, heartTime, TimeUnit.SECONDS));
ch.pipeline().addLast(new DeliveryHandler(deliveryService));
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
b.bind(settingService.getDeliveryPort());
logger.info("socket: "+settingService.getDeliveryPort()+" starting....");
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
} catch (Exception e) {
e.printStackTrace();
}
}
public void channelCreated(Channel channel) throws Exception {
if (LOG.isInfoEnabled()) {
LOG.info("channel created : {}", channel.toString());
}
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(readTimeout, 0, idleTimeout, TimeUnit.MILLISECONDS));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new FastdfsHandler());
}
private void bind() throws Exception {
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(30, 0, 0, TimeUnit.MINUTES));
// 1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(new byte[]{delimiter}), Unpooled.wrappedBuffer(new byte[]{delimiter, delimiter})));
ch.pipeline().addLast(inboundHandler);
}
});
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
this.log.info("TCP服务启动完毕,port={}", this.port);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}
/**
* Initialize our {@link ChannelPipeline} to connect the upstream server.
* LittleProxy acts as a client here.
*
* A {@link ChannelPipeline} invokes the read (Inbound) handlers in
* ascending ordering of the list and then the write (Outbound) handlers in
* descending ordering.
*
* Regarding the Javadoc of {@link HttpObjectAggregator} it's needed to have
* the {@link HttpResponseEncoder} or {@link HttpRequestEncoder} before the
* {@link HttpObjectAggregator} in the {@link ChannelPipeline}.
*
* @param pipeline
* @param httpRequest
*/
private void initChannelPipeline(ChannelPipeline pipeline,
HttpRequest httpRequest) {
if (trafficHandler != null) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
proxyServer.getMaxInitialLineLength(),
proxyServer.getMaxHeaderSize(),
proxyServer.getMaxChunkSize()));
// Enable aggregation for filtering if necessary
int numberOfBytesToBuffer = proxyServer.getFiltersSource()
.getMaximumResponseBufferSizeInBytes();
if (numberOfBytesToBuffer > 0) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
pipeline.addLast("responseReadMonitor", responseReadMonitor);
pipeline.addLast("requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
"idle",
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
pipeline.addLast("handler", this);
}
public void start() {
ServerBootstrap b = new ServerBootstrap();
b.group(m_bossGroup, m_workerGroup)//
.channel(NioServerSocketChannel.class)//
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(10),//
new StringDecoder(Charsets.UTF_8),//
new IdleStateHandler(0, 0, MAX_IDLE_SECONDS),//
new ShutdownRequestInboundHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128) // TODO set tcp options
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(m_config.getShutdownRequestPort());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("Broker shutdown port is {}.", m_config.getShutdownRequestPort());
} else {
log.error("Failed to listen shutdown port {}.", m_config.getShutdownRequestPort());
}
}
});
}
protected void releaseHandlers(PooledConnection conn) {
final ChannelPipeline pipeline = conn.getChannel().pipeline();
removeHandlerFromPipeline(OriginResponseReceiver.CHANNEL_HANDLER_NAME, pipeline);
// The Outbound handler is always after the inbound handler, so look for it.
ChannelHandlerContext passportStateHttpClientHandlerCtx =
pipeline.context(PassportStateHttpClientHandler.OutboundHandler.class);
pipeline.addAfter(passportStateHttpClientHandlerCtx.name(), IDLE_STATE_HANDLER_NAME,
new IdleStateHandler(0, 0, connPoolConfig.getIdleTimeout(), TimeUnit.MILLISECONDS));
}
private Bootstrap createBootstrap(final Endpoint endpoint, final EndpointChannel endpointChannel) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(m_eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true)//
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_SNDBUF, m_config.getNettySendBufferSize())//
.option(ChannelOption.SO_RCVBUF, m_config.getNettyReceiveBufferSize());
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//
new DefaultNettyChannelOutboundHandler(), //
new NettyDecoder(), //
new MagicNumberAndLengthPrepender(), //
new NettyEncoder(), //
new IdleStateHandler(m_config.getEndpointChannelReadIdleTime(), //
m_config.getEndpointChannelWriteIdleTime(), //
m_config.getEndpointChannelMaxIdleTime()), //
new DefaultClientChannelInboundHandler(m_commandProcessorManager, endpoint, endpointChannel,
AbstractEndpointClient.this, m_config));
}
});
return bootstrap;
}
public Bootstrap createBooStrap(String serviceName, final CommunicationOptions communicationOptions) {
// init netty bootstrap
Bootstrap bootstrap = new Bootstrap();
if (communicationOptions.getIoEventType() == BrpcConstants.IO_EVENT_NETTY_EPOLL) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
} else {
bootstrap.channel(NioSocketChannel.class);
}
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, communicationOptions.getConnectTimeoutMillis());
bootstrap.option(ChannelOption.SO_KEEPALIVE, communicationOptions.isKeepAlive());
bootstrap.option(ChannelOption.SO_REUSEADDR, communicationOptions.isReuseAddr());
bootstrap.option(ChannelOption.TCP_NODELAY, communicationOptions.isTcpNoDelay());
bootstrap.option(ChannelOption.SO_RCVBUF, communicationOptions.getReceiveBufferSize());
bootstrap.option(ChannelOption.SO_SNDBUF, communicationOptions.getSendBufferSize());
BrpcThreadPoolManager threadPoolManager = BrpcThreadPoolManager.getInstance();
boolean isSharing = communicationOptions.isGlobalThreadPoolSharing();
ThreadPool workThreadPool = threadPoolManager.getOrCreateClientWorkThreadPool(
serviceName, isSharing, communicationOptions.getWorkThreadNum());
ExecutorService exceptionThreadPool = threadPoolManager.getExceptionThreadPool();
final RpcClientHandler rpcClientHandler = new RpcClientHandler(workThreadPool, exceptionThreadPool);
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (communicationOptions.getChannelType() == ChannelType.SINGLE_CONNECTION) {
ch.pipeline().addLast(new IdleStateHandler(
0, 0, communicationOptions.getKeepAliveTime()));
ch.pipeline().addLast(new IdleChannelHandler());
}
ch.pipeline().addLast(rpcClientHandler);
}
};
EventLoopGroup ioThreadPool = threadPoolManager.getOrCreateClientIoThreadPool(
serviceName, isSharing, communicationOptions.getIoThreadNum(), communicationOptions.getIoEventType());
bootstrap.group(ioThreadPool).handler(initializer);
return bootstrap;
}
@Override
protected void customEndpointHandlers(final ChannelPipeline pipeline) {
if (environment().keepAliveInterval() > 0) {
pipeline.addLast(new IdleStateHandler(environment().keepAliveInterval(), 0, 0, TimeUnit.MILLISECONDS));
}
pipeline.addLast(new HttpClientCodec())
.addLast(new SearchHandler(this, responseBuffer(), false, false));
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("idleStateHandler",
new IdleStateHandler(idleTimeout, idleTimeout / 2, 0, TimeUnit.MILLISECONDS));
if (hangDetection) {
pipeline.addLast("MonitorClientHandler", new MonitorClientHandler());
}
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast("handler", handler);
}
@Override
protected void customEndpointHandlers(final ChannelPipeline pipeline) {
if (environment().keepAliveInterval() > 0) {
pipeline.addLast(new IdleStateHandler(environment().keepAliveInterval(), 0, 0, TimeUnit.MILLISECONDS));
}
pipeline.addLast(new HttpClientCodec());
boolean enableV2 = Boolean.parseBoolean(System.getProperty("com.couchbase.enableYasjlQueryResponseParser", "true"));
if (!enableV2) {
pipeline.addLast(new QueryHandler(this, responseBuffer(), false, false));
} else {
pipeline.addLast(new QueryHandlerV2(this, responseBuffer(), false, false));
}
}
private void negotiateIdle(ChannelHandlerContext ctx, byte idleTimeout, byte maxIdleTimeout) {
byte serverSideIdleTimeout = ctx.channel().attr(OneTime.idleTimeout).get();
byte serverSideMaxIdleTimeout = ctx.channel().attr(ChannelAttrKeys.maxIdleTimeout).get();
if (idleTimeout != serverSideIdleTimeout) {
ChannelHandlerContext idleHandlerContext = ctx.pipeline().context(IdleStateHandler.class);
ctx.pipeline().replace(IdleStateHandler.class, idleHandlerContext.name(),
new IdleStateHandler(idleTimeout, 0, 0));
}
if (maxIdleTimeout != serverSideMaxIdleTimeout) {
ctx.channel().attr(ChannelAttrKeys.maxIdleTimeout).set(maxIdleTimeout);
}
}
private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeoutHandler timeoutHandler,
NewNettyMQTTHandler handler) {
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("handler", handler);
}