下面列出了io.netty.channel.EventLoop#schedule ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("Disconnected from broker(addr={})", NettyUtils.parseChannelRemoteAddr(ctx.channel()));
if (!m_endpointChannel.isClosed()) {
m_endpointChannel.setChannelFuture(null);
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
log.info("Reconnecting to broker({}:{})", m_endpoint.getHost(), m_endpoint.getPort());
m_endpointClient.connect(m_endpoint, m_endpointChannel);
}
}, m_config.getEndpointChannelAutoReconnectDelay(), TimeUnit.SECONDS);
}
super.channelInactive(ctx);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.debug("{} PING write successful", channel);
final EventLoop el = channel.eventLoop();
shutdownFuture = el.schedule(shutdownRunnable, pingIdleTimeNanos, TimeUnit.NANOSECONDS);
pingState = PingState.PENDING_PING_ACK;
resetStopwatch();
} else {
// Mostly because the channel is already closed. So ignore and change state to IDLE.
// If the channel is closed, we change state to SHUTDOWN on destroy.
if (!future.isCancelled() && Exceptions.isExpected(future.cause())) {
logger.debug("{} PING write failed", channel, future.cause());
}
if (pingState != PingState.SHUTDOWN) {
pingState = PingState.IDLE;
}
}
}
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (future.isCancelled()) {
LOG.debug("Connection {} cancelled!", future);
} else if (future.isSuccess()) {
LOG.debug("Connection {} succeeded!", future);
future.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> scheduleConnect());
} else {
if (this.delay > MAXIMUM_BACKOFF) {
LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP "
+ "router {}.", this.remoteAddress);
future.cancel(false);
return;
}
final EventLoop loop = future.channel().eventLoop();
loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
this.remoteAddress, this.delay);
this.delay *= 2;
}
}
synchronized void reconnect() {
if (this.retryTimer == 0) {
LOG.debug("Retry timer value is 0. Reconnection will not be attempted");
this.setFailure(this.pending.cause());
return;
}
final EventLoop loop = this.pending.channel().eventLoop();
loop.schedule(() -> {
synchronized (BGPProtocolSessionPromise.this) {
if (BGPProtocolSessionPromise.this.peerSessionPresent) {
LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
BGPProtocolSessionPromise.this.connectSkipped = true;
return;
}
BGPProtocolSessionPromise.this.connectSkipped = false;
LOG.debug("Attempting to connect to {}", BGPProtocolSessionPromise.this.address);
final ChannelFuture reconnectFuture = BGPProtocolSessionPromise.this.bootstrap.connect();
reconnectFuture.addListener(new BootstrapConnectListener());
BGPProtocolSessionPromise.this.pending = reconnectFuture;
}
}, this.retryTimer, TimeUnit.SECONDS);
LOG.debug("Next reconnection attempt in {}s", this.retryTimer);
}
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
channelFuture.channel().close();
if (count.incrementAndGet() < MAX_RETRY) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(() -> {
try {
controller.connectRetry(this.ip, this.port, this);
} catch (Exception e) {
log.warn("Connection to the ovsdb server {}:{} failed(cause: {})", ip, port, e);
}
}, 1L, TimeUnit.SECONDS);
} else {
failhandler.accept(new Exception("max connection retry(" + MAX_RETRY + ") exceeded"));
}
} else {
handleNewNodeConnection(channelFuture.channel());
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info(String.format(MSG_STATE,
ofSwitch.dpid(),
MSG_CONNECTED,
controller.ip(),
controller.port()));
// FIXME add close future listener to handle connection lost
} else {
if (retryCount.getAndIncrement() > MAX_RETRY) {
log.warn(String.format(MSG_STATE,
ofSwitch.dpid(),
MSG_FAILED,
controller.ip(),
controller.port()));
} else {
final EventLoop loop = future.channel().eventLoop();
loop.schedule(this::connect, 1L, TimeUnit.SECONDS);
}
}
}
private void startTimer(EventLoop eventLoop){
this.timer = eventLoop.schedule(() -> {
this.timeout += 5;
MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), true, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength());
handler.accept(fixedHeader, originalMessage);
startTimer(eventLoop);
}, timeout, TimeUnit.SECONDS);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(!future.isSuccess()){
EventLoop eventExecutors = future.channel().eventLoop();
future.channel().close();
if(!eventExecutors.isShuttingDown()){
eventExecutors.schedule(()->{
if(retryCount.get()<=TRY_LIMITE) {
LOGGER.error("���ͻ���״̬��STATUS=failed,TIME="+ DateUtils.getCurrentDateTime()+",msg=���ڳ�������,retrys="+retryCount.getAndIncrement());
NettyClient.INSTANCE.start();
}else{
NettyClient.INSTANCE.stop();
LOGGER.error("���������桿�ѳ��������������������ر�");
}
},dalayTime,TimeUnit.SECONDS);
dalayTime=dalayTime<<1;//��������Խ�࣬�ӳ�ʱ��Խ��
}
}else{
LOGGER.info("���ͻ���״̬��STATUS=ACTIVE,TIME="+ DateUtils.getCurrentDateTime());
ChannelHolder.setChannel(future.channel());
// //�ж��ϴ��Ƿ��½
// if(!StringUtil.isEmpty(LastLoginRecord.INSTANCE().getLastToken())){
// //��broker������֤ƾ֤
// System.out.println("���͵�½ƾ֤");
// future.channel().writeAndFlush(new Message(FuncodeEnum.AUTH_USER,(byte)0 , null, LastLoginRecord.INSTANCE().getLastToken().getBytes().length, LastLoginRecord.INSTANCE().getLastToken().getBytes()));
// }
//�������ɹ��ָ��������
SubRecorder.recover();
dalayTime=1;
retryCount.set(0);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("�����ѶϿ������ڳ�������...");
//ʹ�ù����ж�������
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
try {
NettyClient.start();
} catch (Exception e) {
log.error("���ӳ����쳣�����ڳ�������...",e);
}
}, 1, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
private void scheduleReconnect(final ChannelHandlerContext ctx, int time) {
if (closed) {
closeChannelAndEventLoop(ctx.channel());
return;
}
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
reconnect(ctx);
}
}, time, TimeUnit.SECONDS);
}
@Override
public void channelUnregistered(final ChannelHandlerContext ctx) throws Exception {
println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + 's');
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
println("Reconnecting to: " + UptimeClient.HOST + ':' + UptimeClient.PORT);
UptimeClient.connect(UptimeClient.configureBootstrap(new Bootstrap(), loop));
}
}, UptimeClient.RECONNECT_DELAY, TimeUnit.SECONDS);
}
private void scheduleReconnect(final ChannelHandlerContext ctx, int time) {
if (closed) {
closeChannelAndEventLoop(ctx.channel());
return;
}
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
reconnect(ctx);
}
}, time, TimeUnit.SECONDS);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
EventLoop eventLoop = future.channel().eventLoop();
eventLoop.schedule(client::start, 1L, TimeUnit.SECONDS);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
client.createBootstrap(new Bootstrap(), eventLoop);
}
}, 1L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (future.isCancelled()) {
LOG.debug("Connection {} cancelled!", future);
} else if (future.isSuccess()) {
LOG.debug("Connection {} succeeded!", future);
future.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> scheduleConnect());
} else {
final EventLoop loop = future.channel().eventLoop();
loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
this.remoteAddress, this.delay);
}
}
@Override
public void operationComplete(final ChannelFuture cf) {
synchronized (this.lock) {
if (PCCReconnectPromise.this.isCancelled()) {
if (cf.isSuccess()) {
PCCReconnectPromise.LOG.debug("Closing channels for cancelled promise {}",
PCCReconnectPromise.this);
cf.channel().close();
}
} else if (cf.isSuccess()) {
PCCReconnectPromise.LOG.debug("Promise connection is successful.");
} else {
PCCReconnectPromise.LOG.debug("Attempt to connect to {} failed",
PCCReconnectPromise.this.address, cf.cause());
if (PCCReconnectPromise.this.retryTimer == 0) {
PCCReconnectPromise.LOG.debug("Retry timer value is 0. Reconnection will not be attempted");
PCCReconnectPromise.this.setFailure(cf.cause());
return;
}
final EventLoop loop = cf.channel().eventLoop();
loop.schedule(() -> {
synchronized (PCCReconnectPromise.this) {
PCCReconnectPromise.LOG.debug("Attempting to connect to {}",
PCCReconnectPromise.this.address);
final Future<Void> reconnectFuture = PCCReconnectPromise.this.bootstrap.connect();
reconnectFuture.addListener(this);
PCCReconnectPromise.this.pending = reconnectFuture;
}
}, PCCReconnectPromise.this.retryTimer, TimeUnit.SECONDS);
PCCReconnectPromise.LOG.debug("Next reconnection attempt in {}s",
PCCReconnectPromise.this.retryTimer);
}
}
}
private void scheduleReconnect(final ChannelHandlerContext ctx) {
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
log.info("Reconnecting to {}", graphiteTarget);
client.connect();
}
}, RECONNECT_DELAY_SEC, TimeUnit.SECONDS);
}
@Test
void removedWhenExceedingBackoffMaxAttempts() throws Exception {
try (TestDnsServer server = new TestDnsServer(ImmutableMap.of(
new DefaultDnsQuestion("foo.com.", A),
new DefaultDnsResponse(0).addRecord(ANSWER, newAddressRecord("foo.com.", "1.1.1.1", 1))))
) {
final EventLoop eventLoop = eventLoopExtension.get();
final DnsResolverGroupBuilder builder = builder(server);
builder.refreshBackoff(Backoff.ofDefault().withMaxAttempts(1));
try (RefreshingAddressResolverGroup group = builder.build(eventLoop)) {
final AddressResolver<InetSocketAddress> resolver = group.getResolver(eventLoop);
final long start = System.nanoTime();
final Future<InetSocketAddress> foo = resolver.resolve(
InetSocketAddress.createUnresolved("foo.com", 36462));
await().untilAsserted(() -> assertThat(foo.isSuccess()).isTrue());
assertThat(foo.getNow().getAddress().getHostAddress()).isEqualTo("1.1.1.1");
server.setResponses(ImmutableMap.of());
// Schedule resolve() every 500 millis to keep cache hits greater than 0.
for (int i = 1; i <= 4; i++) {
eventLoop.schedule(
() -> resolver.resolve(InetSocketAddress.createUnresolved("foo.com", 36462)),
500 * i, TimeUnit.MILLISECONDS);
}
final ConcurrentMap<String, CompletableFuture<CacheEntry>> cache = group.cache();
await().until(cache::isEmpty);
assertThat(System.nanoTime() - start).isGreaterThanOrEqualTo(
(long) (TimeUnit.SECONDS.toNanos(1) * 0.9)); // buffer (90%)
final Future<InetSocketAddress> future = resolver.resolve(
InetSocketAddress.createUnresolved("foo.com", 36462));
await().until(future::isDone);
assertThat(future.cause()).isInstanceOf(UnknownHostException.class);
}
}
}
private void scheduleHeartBeat(EventLoop eventLoop, InstanceInfo newInfo) {
heartBeatFuture = eventLoop.schedule(new HeartBeatTask(eventLoop, newInfo),
newInfo.getLeaseInfo().getRenewalIntervalInSecs(),
TimeUnit.SECONDS);
}
@Override
public void operationComplete(final ChannelFuture cf) {
synchronized (PCEPProtocolSessionPromise.this) {
PCEPProtocolSessionPromise.LOG.debug("Promise {} connection resolved",
PCEPProtocolSessionPromise.this);
Preconditions.checkState(PCEPProtocolSessionPromise.this.pending.equals(cf));
if (PCEPProtocolSessionPromise.this.isCancelled()) {
if (cf.isSuccess()) {
PCEPProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}",
PCEPProtocolSessionPromise.this);
cf.channel().close();
}
} else if (cf.isSuccess()) {
PCEPProtocolSessionPromise.LOG.debug("Promise {} connection successful",
PCEPProtocolSessionPromise.this);
} else {
PCEPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed",
PCEPProtocolSessionPromise.this.address, cf.cause());
if (PCEPProtocolSessionPromise.this.retryTimer == 0) {
PCEPProtocolSessionPromise.LOG
.debug("Retry timer value is 0. Reconnection will not be attempted");
PCEPProtocolSessionPromise.this.setFailure(cf.cause());
return;
}
final EventLoop loop = cf.channel().eventLoop();
loop.schedule(() -> {
synchronized (PCEPProtocolSessionPromise.this) {
PCEPProtocolSessionPromise.LOG.debug("Attempting to connect to {}",
PCEPProtocolSessionPromise.this.address);
final Future<Void> reconnectFuture = PCEPProtocolSessionPromise.this.bootstrap.connect();
reconnectFuture.addListener(BootstrapConnectListener.this);
PCEPProtocolSessionPromise.this.pending = reconnectFuture;
}
}, PCEPProtocolSessionPromise.this.retryTimer, TimeUnit.SECONDS);
PCEPProtocolSessionPromise.LOG.debug("Next reconnection attempt in {}s",
PCEPProtocolSessionPromise.this.retryTimer);
}
}
}