下面列出了怎么用io.netty.util.TimerTask的API类实例代码及写法,或者点击链接到github查看源代码。
public void track(PlatformMessage request, long timeout, TimeUnit unit, Action<V> action) {
final String corr = request.getCorrelationId();
if (StringUtils.isEmpty(corr)) {
return;
}
Action<V> old = inflight.put(corr, action);
if (old != null) {
log.warn("conflicting requests correlation ids, terminating out old request via timeout handler");
doTimeout(old);
}
timer.newTimeout(new TimerTask() {
@Override
public void run(@Nullable Timeout to) {
Action<V> match = inflight.remove(corr);
if (match != null) {
log.info("timed out waiting for response to {}", corr);
doTimeout(match);
}
}
}, timeout, unit);
}
@Override
public void subscribe(SubscribeInfo subscribeInfo, final NotifyListener listener) {
namingServiceTimer.newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
try {
List<ServiceInstance> currentInstances = lookup(null);
Collection<ServiceInstance> addList = CollectionUtils.subtract(
currentInstances, lastInstances);
Collection<ServiceInstance> deleteList = CollectionUtils.subtract(
lastInstances, currentInstances);
listener.notify(addList, deleteList);
lastInstances = currentInstances;
} catch (Exception ex) {
// ignore exception
}
namingServiceTimer.newTimeout(this, updateInterval, TimeUnit.MILLISECONDS);
}
},
updateInterval, TimeUnit.MILLISECONDS);
}
public TokenBucketRateLimiter(int bucketSize, int tokenInputRate) {
if (bucketSize <= 0 || tokenInputRate <= 0) {
throw new IllegalArgumentException("bucketSize and rate must be positive!");
}
this.bucketSize = bucketSize;
this.tokenPerInterval = tokenInputRate / (1000 / timeIntervalMs);
if (this.tokenPerInterval == 0) {
this.tokenPerInterval = 1;
}
this.currentToken = new AtomicInteger(bucketSize);
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
supply();
timer.newTimeout(this, timeIntervalMs, TimeUnit.MILLISECONDS);
}
}, timeIntervalMs, TimeUnit.MILLISECONDS);
}
@Override
public void subscribe(SubscribeInfo subscribeInfo, final NotifyListener listener) {
namingServiceTimer.newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
try {
List<ServiceInstance> currentInstances = lookup(subscribeInfo);
Collection<ServiceInstance> addList = CollectionUtils.subtract(
currentInstances, lastInstances);
Collection<ServiceInstance> deleteList = CollectionUtils.subtract(
lastInstances, currentInstances);
listener.notify(addList, deleteList);
lastInstances = currentInstances;
} catch (Exception ex) {
// ignore exception
}
namingServiceTimer.newTimeout(this, updateInterval, TimeUnit.MILLISECONDS);
}
},
updateInterval, TimeUnit.MILLISECONDS);
}
public void asyncSendEthereumMessage(
BcosRequest request,
BcosResponseCallback fiscoResponseCallback,
TransactionSucCallback transactionSucCallback) {
this.asyncSendEthereumMessage(request, fiscoResponseCallback);
if (request.getTimeout() > 0) {
final TransactionSucCallback callbackInner = transactionSucCallback;
callbackInner.setTimeout(
timeoutHandler.newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 处理超时逻辑
callbackInner.onTimeout();
// timeout时清除map的数据,所以尽管后面有回包数据,也会找不到seq->callback的关系
seq2TransactionCallback.remove(request.getMessageID());
}
},
request.getTimeout(),
TimeUnit.MILLISECONDS));
this.seq2TransactionCallback.put(request.getMessageID(), callbackInner);
} else {
this.seq2TransactionCallback.put(request.getMessageID(), transactionSucCallback);
}
}
/**
* 负责客户端引发的超时逻辑。
* @param rsfFuture 开始计时的请求。
*/
private void startRequest(RsfFuture rsfFuture) {
this.requestCount.incrementAndGet();// i++;
this.rsfResponse.put(rsfFuture.getRequest().getRequestID(), rsfFuture);
final RsfRequestFormLocal request = (RsfRequestFormLocal) rsfFuture.getRequest();
TimerTask timeTask = timeoutObject -> {
RsfFuture rsfCallBack = getRequest(request.getRequestID());
/*检测不到说明请求已经被正确响应。*/
if (rsfCallBack == null) {
return;
}
/*异常信息*/
String errorInfo = "request(" + request.getRequestID() + ") -> timeout for client.";
invLogger.error(errorInfo);
/*回应Response*/
putResponse(request.getRequestID(), new RsfTimeoutException(errorInfo));
};
invLogger.info("request({}) -> startRequest, timeout at {} ,bindID ={}, callMethod ={}.", //
request.getRequestID(), request.getTimeout(), request.getBindInfo().getBindID(), request.getMethod());
this.getContext().getEnvironment().atTime(timeTask, request.getTimeout());
}
private void processFollowerTimer(long lastLeaderHeartbeat) {
// .如果系统退出,那么结束定时器循环
if (!this.landStatus.get()) {
return;
}
// .执行 Follower 任务
try {
this.processFollower(lastLeaderHeartbeat);
} catch (Exception e) {
logger.error("Land[Follower] - " + e.getMessage(), e);
}
// .重启定时器
final long curLeaderHeartbeat = this.server.getLastHeartbeat();
this.landContext.atTime(new TimerTask() {
public void run(Timeout timeout) throws Exception {
processFollowerTimer(curLeaderHeartbeat);
}
}, genTimeout());
}
private void processCandidateTimer() {
// .如果系统退出,那么结束定时器循环
if (!this.landStatus.get()) {
return;
}
// .执行 Candidate 任务
try {
this.processCandidate();
} catch (Exception e) {
logger.error("Land[Candidate] - " + e.getMessage(), e);
}
// .重启定时器
this.landContext.atTime(new TimerTask() {
public void run(Timeout timeout) throws Exception {
processCandidateTimer();
}
}, genTimeout());
}
private void processLeaderTimer() {
// .如果系统退出,那么结束定时器循环
if (!this.landStatus.get()) {
return;
}
// .执行 Leader 任务
try {
this.processLeader();
} catch (Exception e) {
logger.error("Land[Leader] - " + e.getMessage(), e);
}
// .重启定时器
this.landContext.atTime(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
processLeaderTimer();
}
}, this.leaderHeartbeat);
}
private void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCount) {
final TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.cancel()) {
return;
}
logger.info("Retry {} {}", remainingRetryCount, request);
request(request, remainingRetryCount - 1);
}
};
try {
retryTimer.newTimeout(timerTask, 1000, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.debug("retry fail {}", e.getCause(), e);
}
}
public Timeout newTimeout(TimerTask task, long recycleDelay, TimeUnit unit,
BooleanSupplier checkCondition) {
return super
.newTimeout(new RecycleAsyncTimerTask(task, recycleDelay, unit, checkCondition),
recycleDelay, unit);
}
/**
* @param timerTask
*/
public RecycleAsyncTimerTask(TimerTask timerTask, long recycleDelay, TimeUnit unit,
BooleanSupplier checkCondition) {
super();
this.timerTask = timerTask;
this.recycleDelay = recycleDelay;
this.delayUnit = unit;
this.checkCondition = checkCondition;
}
@Override
protected ScheduledTask doSchedule(Runnable task, Date time, long delay, TimeUnit unit) {
Timeout timo = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
task.run();
}
}, delay, unit);
return new TimeoutScheduledTask(timo);
}
private void delayCloseFtdcChannel(final Channel oldFtdcChannel) {
ApplicationRuntime.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
oldFtdcChannel.close();
}
}, 1);
}
@Override
public void subscribe(final SubscribeInfo subscribeInfo, final NotifyListener listener) {
namingServiceTimer.newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
try {
File file = new File(filePath);
long currentModified = file.lastModified();
if (currentModified > lastModified) {
List<ServiceInstance> currentInstances = lookup(subscribeInfo);
Collection<ServiceInstance> addList = CollectionUtils.subtract(
currentInstances, lastInstances);
Collection<ServiceInstance> deleteList = CollectionUtils.subtract(
lastInstances, currentInstances);
listener.notify(addList, deleteList);
lastInstances = currentInstances;
}
} catch (Exception ex) {
// ignore exception
}
namingServiceTimer.newTimeout(this, updateInterval, TimeUnit.MILLISECONDS);
}
},
updateInterval, TimeUnit.MILLISECONDS);
}
/**
* constructor
*
* @param maxQps max query per second
*/
public CounterRateLimiter(int maxQps) {
if (maxQps <= 0) {
throw new IllegalArgumentException("maxQps must be positive!");
}
this.maxReqPerInterval = maxQps / (1000 / timeIntervalMs);
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
count.set(0);
timer.newTimeout(this, timeIntervalMs, TimeUnit.MILLISECONDS);
}
}, timeIntervalMs, TimeUnit.MILLISECONDS);
}
public void sendBackAndCompleteNack(final int nextRetryCount, final BaseMessage message, final AckEntry ackEntry) {
final BrokerClusterInfo brokerCluster = brokerService.getClusterBySubject(ClientType.PRODUCER, message.getSubject());
final SendMessageBack.Callback callback = new SendMessageBack.Callback() {
private final int retryTooMuch = brokerCluster.getGroups().size() * 2;
private final AtomicInteger retryNumOnFail = new AtomicInteger(0);
@Override
public void success() {
ackEntry.completed();
}
@Override
public void fail(Throwable e) {
if (retryNumOnFail.incrementAndGet() > retryTooMuch) {
if (e instanceof SendMessageBackException) {
LOGGER.error("send message back fail, and retry {} times after {} seconds. exception: {}", retryNumOnFail.get(), SEND_BACK_DELAY_SECONDS, e.getMessage());
} else {
LOGGER.error("send message back fail, and retry {} times after {} seconds", retryNumOnFail.get(), SEND_BACK_DELAY_SECONDS, e);
}
final SendMessageBack.Callback callback1 = this;
TimerUtil.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
SendMessageBackImpl.this.sendBackAndCompleteNack(message, callback1);
}
}, SEND_BACK_DELAY_SECONDS, TimeUnit.SECONDS);
} else {
if (e instanceof SendMessageBackException) {
LOGGER.error("send message back fail, and retry {} times. exception: {}", retryNumOnFail.get(), SEND_BACK_DELAY_SECONDS, e.getMessage());
} else {
LOGGER.error("send message back fail, and retry {} times", retryNumOnFail.get(), SEND_BACK_DELAY_SECONDS, e);
}
SendMessageBackImpl.this.sendBackAndCompleteNack(message, this);
}
}
};
final BrokerGroupInfo brokerGroup = brokerLoadBalance.loadBalance(brokerCluster, null);
sendBack(brokerGroup, message, callback, ClientType.PRODUCER);
}
public void refreshHeartbeat(T key, TimerTask task, long timeout, TimeUnit unit) {
Timeout context = timer.newTimeout(task, timeout, unit);
final Timeout old = timeouts.put(key, context);
if (old != null && !old.isCancelled() && !old.isExpired()) {
old.cancel();
}
}
public static void main(String[] args) throws InterruptedException {
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1000, TimeUnit.MILLISECONDS, 16);
hashedWheelTimer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(System.currentTimeMillis() + " === executed");
}
}, 1, TimeUnit.SECONDS);
}
protected void doSend(final SofaRequest request, AbstractHttpClientHandler callback, final int timeoutMills) {
AbstractByteBuf data = null;
try {
// 序列化
byte serializeType = request.getSerializeType();
Serializer serializer = SerializerFactory.getSerializer(serializeType);
data = serializer.encode(request, null);
request.setData(data);
// 记录请求序列化大小 不是很准,没有记录HTTP头
RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE, data.readableBytes());
// 转换请求
FullHttpRequest httpRequest = convertToHttpRequest(request);
// 发送请求
final int requestId = sendHttpRequest(httpRequest, callback);
if (request.isAsync()) {
TIMEOUT_TIMER.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
Map.Entry<ChannelFuture, AbstractHttpClientHandler> entry = responseChannelHandler
.removePromise(requestId);
if (entry != null) {
ClientHandler handler = entry.getValue();
Exception e = timeoutException(request, timeoutMills, null);
handler.onException(e);
}
}
}, timeoutMills, TimeUnit.MILLISECONDS);
}
} finally {
if (data != null) {
data.release();
}
}
}
public void timerThroughputTest(Control ctrl) throws InterruptedException {
counterDown.set(times);
for (int i = 0; i < times; i++) {
timer.newTimeout((TimerTask) (v) -> counterDown.decrementAndGet(),
delay,
TimeUnit.MILLISECONDS);
}
while (!ctrl.stopMeasurement && counterDown.get() > 0) {
// spin
}
}
private void trySetup(final int attempt) {
log.info("Attempt {}", action);
final ChannelFuture connect = action.setup();
connect.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("Successful {}", action);
setChannel(future.channel());
return;
}
final long delay = policy.delay(attempt);
log.warn("Failed {} (attempt: {}), retrying in {}s: {}", action, attempt + 1,
TimeUnit.SECONDS.convert(delay, TimeUnit.MILLISECONDS),
future.cause().getMessage());
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (stopped.get()) {
return;
}
trySetup(attempt + 1);
}
}, delay, TimeUnit.MILLISECONDS);
}
});
}
private void startFollowerTimer() {
if (!this.followerTimer.compareAndSet(false, true)) {
this.logger.error("Land[Follower] - followerTimer -> already started");
return;
}
this.logger.info("Land[Follower] - start followerTimer.");
final long lastLeaderHeartbeat = this.server.getLastHeartbeat();
this.landContext.atTime(new TimerTask() {
public void run(Timeout timeout) throws Exception {
processFollowerTimer(lastLeaderHeartbeat);
}
}, genTimeout());
}
private void startCandidateTimer() {
if (!this.candidateTimer.compareAndSet(false, true)) {
this.logger.error("Land[Candidate] - candidateTimer -> already started");
return;
}
this.logger.info("Land[Candidate] - start candidateTimer.");
this.landContext.atTime(new TimerTask() {
public void run(Timeout timeout) throws Exception {
processCandidateTimer();
}
}, this.genTimeout());
}
private void startLeaderTimer() {
if (!this.leaderTimer.compareAndSet(false, true)) {
this.logger.error("Land[Leader] - leaderTimer -> already started");
return;
}
this.logger.info("Land[Leader] - start leaderTimer.");
this.landContext.atTime(new TimerTask() {
public void run(Timeout timeout) throws Exception {
processLeaderTimer();
}
}, this.leaderHeartbeat);
}
private void sendPing(ChannelHandlerContext ctx) {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
CommandData<?, ?> commandData = connection.getCurrentCommand();
RFuture<String> future;
if (commandData == null || !commandData.isBlockingCommand()) {
future = connection.async(StringCodec.INSTANCE, RedisCommands.PING);
} else {
future = null;
}
config.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (connection.isClosed()) {
return;
}
if (future != null
&& (future.cancel(false) || !future.isSuccess())) {
ctx.channel().close();
if (future.cause() != null) {
log.error("Unable to send PING command over channel: " + ctx.channel(), future.cause());
}
log.debug("channel: {} closed due to PING response timeout set in {} ms", ctx.channel(), config.getPingConnectionInterval());
} else {
sendPing(ctx);
}
}
}, config.getPingConnectionInterval(), TimeUnit.MILLISECONDS);
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
try {
return timer.newTimeout(task, delay, unit);
} catch (IllegalStateException e) {
if (isShuttingDown()) {
return DUMMY_TIMEOUT;
}
throw e;
}
}
protected <T> void scheduleCheck(String mapName, RequestId requestId, RPromise<T> cancelRequest) {
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (cancelRequest.isDone()) {
return;
}
RMap<String, T> canceledRequests = getMap(mapName);
RFuture<T> future = canceledRequests.removeAsync(requestId.toString());
future.onComplete((request, ex) -> {
if (cancelRequest.isDone()) {
return;
}
if (ex != null) {
scheduleCheck(mapName, requestId, cancelRequest);
return;
}
if (request == null) {
scheduleCheck(mapName, requestId, cancelRequest);
} else {
cancelRequest.trySuccess(request);
}
});
}
}, 3000, TimeUnit.MILLISECONDS);
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected void scheduleRetryTimeRenewal(String requestId, Long retryInterval) {
if (retryInterval == null) {
return;
}
((Redisson) redisson).getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
renewRetryTime(requestId);
}
}, Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS);
}