下面列出了io.netty.channel.ChannelDuplexHandler#io.netty.handler.timeout.IdleStateEvent 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
LOGGER.info(getClass().getName() + "::读超时,关闭连接:" + ctx.channel());
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(ctx.alloc().buffer(1).writeInt(0));
if (LOGGER.isDebugEnabled())
LOGGER.debug(getClass().getName() + "::写超时,发送心跳包:" + ctx.channel());
} else if (e.state() == IdleState.ALL_IDLE) {
if (LOGGER.isDebugEnabled())
LOGGER.debug(getClass().getName() + "::读写都超时,发送心跳包:" + ctx.channel());
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE || event.state() == IdleState.WRITER_IDLE ||
event.state() == IdleState.ALL_IDLE) {
long curtime = TimeUtil.getCurrentTimeInLong();
if (curtime < lastReceiverTime + HEART_FREQUENCY) {
LogManager.getLogger().d(Tag, "userEventTriggered() ==> " + (curtime - lastReceiverTime));
try {
localBinder.connectMessage(ServiceAck.HEART_BEAT.getAck(), new byte[0], new byte[0]);
} catch (RemoteException e) {
e.printStackTrace();
reconDelay();
}
} else {//connect timeout
ctx.close();
LogManager.getLogger().d(Tag, "userEventTriggered() ==> connect timeout" + (curtime - lastReceiverTime));
reconDelay();
}
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
logger.warn("Read idle, due to missed heartbeats, closing connection: " + ctx.channel().remoteAddress());
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ByteBuffer buffer = ByteBuffer.allocate(4 + 1); // length plus one byte
buffer.putInt(1); // length of op code
buffer.put((byte) 6); // op code for heartbeat for legacy protocol
ctx.channel().writeAndFlush(buffer.array());
}
}
super.userEventTriggered(ctx, evt);
}
/**
* 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
System.out.println("已经5秒没有接收到客户端的信息了");
if (idle_count > 1) {
System.out.println("关闭这个不活跃的channel");
ctx.channel().close();
}
idle_count++;
}
} else {
super.userEventTriggered(ctx, obj);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
Channel channel = new NettyChannel(ctx);
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(channel);
if (event.state().equals(io.netty.handler.timeout.IdleState.ALL_IDLE)) {
LOGGER.warn("CLIENT : IDLE [{}]", remoteAddress);
closeChannel(channel);
}
if (channelEventListener != null) {
RemotingEventType remotingEventType = RemotingEventType.valueOf(event.state().name());
putRemotingEvent(new RemotingEvent(remotingEventType,
remoteAddress, channel));
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 读超时或者写超时会进入这个事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
// 空闲事件存储到队列
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.ALL_IDLE) {
ctx.close();
LOG.info("close ALL_IDLE {}", evt.toString());
} else if (e.state() == IdleState.READER_IDLE) {
ctx.close();
LOG.info("close READER_IDLE {}", evt.toString());
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.close();
LOG.info("close WRITER_IDLE {}", evt.toString());
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
LOGGER.info(getClass().getName() + "::读超时,关闭连接:" + ctx.channel());
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(ctx.alloc().buffer(1).writeInt(0));
if(LOGGER.isDebugEnabled())
LOGGER.debug(getClass().getName() + "::写超时,发送心跳包:" + ctx.channel());
//check times of write idle, close the channel if exceed specify times
IdleConnectionManager.addChannel(ctx.channel());
} else if (e.state() == IdleState.ALL_IDLE) {
if(LOGGER.isDebugEnabled())
LOGGER.debug(getClass().getName() + "::读写都超时,发送心跳包:" + ctx.channel());
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
com.github.ltsopensource.remoting.Channel channel = new NettyChannel(ctx);
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(channel);
if (event.state().equals(IdleState.ALL_IDLE)) {
LOGGER.warn("SERVER: IDLE [{}]", remoteAddress);
RemotingHelper.closeChannel(channel);
}
if (channelEventListener != null) {
RemotingEventType remotingEventType = RemotingEventType.valueOf(event.state().name());
putRemotingEvent(new RemotingEvent(remotingEventType,
remoteAddress, channel));
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
NettyLog.info("WRITER_IDLE");
} else if (e.state() == IdleState.READER_IDLE) {
NettyLog.info("READER_IDLE");
//ctx.channel().close();
} else if (e.state() == IdleState.ALL_IDLE) {
NettyLog.info("ALL_IDLE");
//
ctx.close();
return ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
if (user != null && migrateState == MigrateState.WaitCenterMigrateOutResult) {
// If a user has been migrating for over 30 seconds, dc and prepare a
// HandleUserMigrateTimeout on the center JVM.
if (System.currentTimeMillis() - migrateOut > 30000) {
channelInactive(ctx);
}
// If there's no active user and they aren't migrating, then they have no
// excuse as to why they have no read operations for 20 seconds straight.
} else {
channelInactive(ctx);
}
// Handle the AliveAck on the client-end if this ever occurs.
// If the user hits over a 15-second writer timeout -> disconnect.
} else if (e.state() == IdleState.WRITER_IDLE) {
channel.writeAndFlush(onAliveReq((int) (System.currentTimeMillis() / 1000)));
}
}
}
@Override
public void userEventTriggered(final @NotNull ChannelHandlerContext ctx, final @NotNull Object evt) {
if (evt instanceof IdleStateEvent) {
if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
if (log.isDebugEnabled()) {
log.debug("Client with IP {} disconnected. The client was idle for too long without sending a MQTT CONNECT packet",
ChannelUtils.getChannelIP(ctx.channel()).or("UNKNOWN"));
}
eventLog.clientWasDisconnected(ctx.channel(), "No CONNECT sent in time");
ctx.close();
return;
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
if (log.isDebugEnabled()) {
log.debug("Client with IP {} disconnected. The client was idle for too long without finishing the TLS handshake",
ChannelUtils.getChannelIP(ctx.channel()).or("UNKNOWN"));
}
eventLog.clientWasDisconnected(ctx.channel(), "TLS handshake not finished in time");
ctx.close();
return;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
Channel channel = ctx.channel();
log.error("userEventTriggered={}, object={}", channel, evt);
if (!(evt instanceof IdleStateEvent)) { return; }
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
log.debug("userEventTriggered(readIdle)={}", ctx.channel());
if (isReadBytes == false) ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
log.debug("userEventTriggered(writeIdle)={}", ctx.channel());
if (isReadBytes == false) ctx.close();
} else {
log.debug("userEventTriggered(allIdle)={}", ctx.channel());
if (isReadBytes == false) ctx.close();
}
}
}
/**
* 这里是保持服务器与客户端长连接 进行心跳检测 避免连接断开
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent stateEvent = (IdleStateEvent) evt;
PingWebSocketFrame ping = new PingWebSocketFrame();
switch (stateEvent.state()){
//读空闲(服务器端)
case READER_IDLE:
LOGGER.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
ctx.writeAndFlush(ping);
break;
//写空闲(客户端)
case WRITER_IDLE:
LOGGER.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
ctx.writeAndFlush(ping);
break;
case ALL_IDLE:
LOGGER.info("【"+ctx.channel().remoteAddress()+"】读写空闲");
break;
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (!(evt instanceof IdleStateEvent)) {
super.userEventTriggered(ctx, evt);
return;
}
IdleStateEvent event = (IdleStateEvent) evt;
String side = "RESPONSE";
String remoteAddress = ctx.channel().remoteAddress().toString();
Date currentDate = new Date();
logger.debug("[JobX] Idle request, requestType: [{}], remoteInfo: [{}], eventType: [{}]",
side,
remoteAddress,
event.state()
);
ctx.writeAndFlush(new IdleResponse().setEvent(event).setRemoteAddr(remoteAddress).setSide(side).setTime(currentDate)); }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String clientId = NettyAttrManager.getAttrClientId(ctx.channel());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
// keepalive的1.5倍时间内没有收到client端写操作 触发inactive并关闭连接
LOG.info("READER_IDLE: {}, start close channel...", clientId);
ctx.fireChannelInactive();
ctx.close().addListener(CLOSE_ON_FAILURE);
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
//未进行写操作
LOG.info("WRITER_IDLE: {}, start close channel...", clientId);
} else if (event.state().equals(IdleState.ALL_IDLE)) {
//未进行读写
LOG.info("ALL_IDLE: {}, start close channel...", clientId);
ctx.fireChannelInactive();
ctx.close().addListener(CLOSE_ON_FAILURE);
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
//��������,���ֳ�����
JSONObject json = new JSONObject();
json.put("method", "ping");
json.put("hostName", NETTY_HOST);
//ctx.channel().writeAndFlush(json.toString() + "$_").sync();
sendMessage(json.toString());
//log.info("�������ͳɹ�!");
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void init(Channel channel) {
if (timeoutMs > 0) {
LOGGER.debug("Channel idle timeout is {}ms.", timeoutMs);
channel.pipeline().addLast(new IdleStateHandler(0, 0, timeoutMs, TimeUnit.MILLISECONDS) {
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.ALL_IDLE) {
// Some protocols may have their own grace period to shutdown the channel
// and we don't want the idle timeout to fire again during this process.
ctx.pipeline().remove(this);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Closing channel {} after {}ms of inactivity.", ctx.channel(), timeoutMs);
}
// Fire the event through the pipeline so protocols can prepare for the close event.
ctx.fireUserEventTriggered(evt);
ctx.close();
}
}
});
}
}
@Override
public void userEventTriggered(ChannelHandlerContext context, Object o) throws Exception {
// 读写空闲时,发送心跳信息
if (o instanceof IdleStateEvent) {
/*IdleStateEvent event = (IdleStateEvent) e;
IdleState state = event.state();
if (state == IdleState.WRITER_IDLE) {*/
NettyRpcRequest request = new NettyRpcRequest();
request.setHeatBeat( true );
request.setInterfaze( HeartBeat.class.getName() );
request.setMethod( "beat" );
request.setSyn( false );
if (logHeartBeat) {
LOG.info( "Send heart beat request..." );
}
context.writeAndFlush( request );
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
FastdfsOperation<?> operation = ctx.channel().attr(OPERATION_KEY).get();
// read idle event.
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT
|| evt == IdleStateEvent.READER_IDLE_STATE_EVENT) {
if (null != operation) {
throw new FastdfsReadTimeoutException(
String.format(
"execute %s read timeout.",
operation
)
);
}
}
// all idle event.
if (evt == IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT
|| evt == IdleStateEvent.ALL_IDLE_STATE_EVENT) {
throw new FastdfsTimeoutException("fastdfs channel was idle timeout.");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
//心跳配置
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
SpringBeanUtils.getInstance().getBean(NettyClientService.class).doConnect();
} else if (event.state() == IdleState.WRITER_IDLE) {
//表示已经多久没有发送数据了
HEART_BEAT.setAction(NettyMessageActionEnum.HEART.getCode());
HEART_BEAT.setMetaInfo(modelNameService.findClientMetaInfo());
HEART_BEAT.setSerialProtocol(this.txConfig.getNettySerializer());
TxTransactionGroup group = new TxTransactionGroup();
group.setSource(modelNameService.findModelName());
HEART_BEAT.setTxTransactionGroup(group);
ctx.writeAndFlush(HEART_BEAT);
LogUtil.debug(LOGGER, "发送【心跳】事件到Lottor Server【{}】", () -> ctx.channel().remoteAddress().toString().substring(1));
} else if (event.state() == IdleState.ALL_IDLE) {
//表示已经多久既没有收到也没有发送数据了
SpringBeanUtils.getInstance().getBean(NettyClientService.class).doConnect();
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent evnet = (IdleStateEvent) evt;
if (evnet.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (nettyRemotingAbstract.hasEventListener()) {
nettyRemotingAbstract
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingUtil.parseChannelRemoteAddr(ctx.channel());
logger.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
HeartBeatFactory heartBeatFactory = SpiClassLoader.getClassLoader(HeartBeatFactory.class).getExtension(config.getExt(ConfigEnum.heartbeatFactory.getName(), ConfigEnum.heartbeatFactory.getValue()));
ctx.writeAndFlush(heartBeatFactory.createRequest()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
client.resetErrorCount();
}
}
});
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
Channel channel = ctx.channel();
dataCenterChannelStore.isDcChannelToSave(channel);
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) {
//发送心跳
channel.writeAndFlush(PING);
}
if (event.state() == IdleState.READER_IDLE) {
//发送心跳
channel.writeAndFlush(PING);
}
if (event.state() == IdleState.WRITER_IDLE) {
channel.writeAndFlush(PING);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}