下面列出了com.google.common.util.concurrent.RateLimiter#tryAcquire ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public LimitResult tryAccess(LimitEntity limitEntity) {
RateLimiter rateLimiter = getRateLimiter(limitEntity);
if (rateLimiter == null) {
return null;
}
LimitResult limitResult = new LimitResult();
limitResult.setUrl(limitEntity.getKey());
limitEntity.setIdentifier(limitEntity.getIdentifier());
boolean access = rateLimiter.tryAcquire(1, 2000, TimeUnit.MILLISECONDS);
log.info("identifier:" + limitEntity.getIdentifier() + " url:" + limitResult.getUrl() + " access:{}", access);
if (access) {
limitResult.setResultType(LimitResult.ResultType.SUCCESS);
} else {
limitResult.setResultType(LimitResult.ResultType.FAIL);
}
return limitResult;
}
private boolean isRateLimitExceeded(String topic, int partition, int msgCount, int bytes) {
RateLimiter qpsRateLimiter = m_config.getPartitionProduceQPSRateLimiter(topic, partition);
RateLimiter bytesRateLimiter = m_config.getPartitionProduceBytesRateLimiter(topic, partition);
if (qpsRateLimiter.tryAcquire(msgCount)) {
if (bytesRateLimiter.tryAcquire(bytes)) {
return false;
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_BYTES_RATE_LIMIT_EXCEED, topic + "-" + partition,
Event.SUCCESS, "msgCount=" + msgCount + "&bytes=" + bytes);
}
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_QPS_RATE_LIMIT_EXCEED, topic + "-" + partition, Event.SUCCESS,
"msgCount=" + msgCount + "&bytes=" + bytes);
}
return true;
}
private boolean isRateLimitExceeded(String topic, int partition, int msgCount, int bytes) {
RateLimiter qpsRateLimiter = m_config.getPartitionProduceQPSRateLimiter(topic, partition);
RateLimiter bytesRateLimiter = m_config.getPartitionProduceBytesRateLimiter(topic, partition);
if (qpsRateLimiter.tryAcquire(msgCount)) {
if (bytesRateLimiter.tryAcquire(bytes)) {
return false;
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_BYTES_RATE_LIMIT_EXCEED, topic + "-" + partition,
Event.SUCCESS, "msgCount=" + msgCount + "&bytes=" + bytes);
}
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_QPS_RATE_LIMIT_EXCEED, topic + "-" + partition, Event.SUCCESS,
"msgCount=" + msgCount + "&bytes=" + bytes);
}
return true;
}
private boolean isRateLimitExceeded(String topic, int partition, int msgCount, int bytes) {
RateLimiter qpsRateLimiter = m_config.getPartitionProduceQPSRateLimiter(topic, partition);
RateLimiter bytesRateLimiter = m_config.getPartitionProduceBytesRateLimiter(topic, partition);
if (qpsRateLimiter.tryAcquire(msgCount)) {
if (bytesRateLimiter.tryAcquire(bytes)) {
return false;
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_BYTES_RATE_LIMIT_EXCEED, topic + "-" + partition,
Event.SUCCESS, "msgCount=" + msgCount + "&bytes=" + bytes);
}
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_QPS_RATE_LIMIT_EXCEED, topic + "-" + partition, Event.SUCCESS,
"msgCount=" + msgCount + "&bytes=" + bytes);
}
return true;
}
public static void main(String[] args) {
try {
RateLimiter limit = RateLimiter.create(100d);
PropertyConfigurator.configure("conf/log4j.properties");
while (true) {
if (limit.tryAcquire()) {
final HelloCommand command = new HelloCommand();
// System.out.println("result: " + command.execute());
// System.out.println("");
Future<String> future = command.queue();
System.out.println("result: " + future.get());
System.out.println("");
}
}
// Observable<String> observe = command.observe();
// observe.asObservable().subscribe((result) -> {
// System.out.println(result);
// });
} catch (Exception e) {
e.printStackTrace();
}
}
public static IFace create(double maxRequestsPerSecond) {
if (maxRequestsPerSecond <= 0.0) {
return ALLOW_NONE;
}
final RateLimiter rateLimiter = RateLimiter.create(maxRequestsPerSecond);
return new IFace() {
@Override
public boolean allowDequeue(int numJobs) {
return rateLimiter.tryAcquire(numJobs);
}
@Override
public double getRate() {
return rateLimiter.getRate();
}
};
}
/**
* Try to acquire token from rate limiter. Emit the utilization of the qps quota if broker metric isn't null.
* @param tableNameWithType table name with type.
* @param queryQuotaEntity query quota entity for type-specific table.
* @return true if there's no qps quota for that table, or a token is acquired successfully.
*/
private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity queryQuotaEntity) {
// Use hit counter to count the number of hits.
queryQuotaEntity.getHitCounter().hit();
RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
double perBrokerRate = rateLimiter.getRate();
// Emit the qps capacity utilization rate.
int numHits = queryQuotaEntity.getHitCounter().getHitCount();
if (_brokerMetrics != null) {
int percentageOfCapacityUtilization = (int) (numHits * 100 / perBrokerRate);
LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization);
_brokerMetrics.setValueOfTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE,
percentageOfCapacityUtilization);
}
if (!rateLimiter.tryAcquire()) {
LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. Current qps: {}", tableNameWithType,
perBrokerRate, numHits);
return false;
}
// Token is successfully acquired.
return true;
}
@Override
public CommonResult<AcUser> register(LoginUserVo loginUserVo) {
CommonResult result = CommonResult.create();
try {
RateLimiter rateLimiter = rateLimiterProvider.getRegisterRateLimiter();
boolean acquired = rateLimiter.tryAcquire();
if (!acquired) {
return result.failed("注册繁忙,超过最大注册数量,请稍候重试!");
}
LoginContext loginContext = LoginHelper.getLoginContext();
if (loginContext != null) {
return result.failed("您已经登录成功,请勿重复注册!");
}
acUserValidator.validateForRegister(loginUserVo);
AcUser acUser = acUserManager.getByUserName(loginUserVo.getUserName());
if (acUser != null) {
return result.failed("用户已存在!");
}
loginUserVo.setRoleCode(Profiles.getInstance().getRegisterRoleCode());
if (StringUtils.isEmpty(loginUserVo.getRoleCode())) {
loginUserVo.setRoleCode(RoleCodeEnum.GUEST_USER.getCode());
}
AcUser acUserNew = acUserBuilder.buildForRegister(loginUserVo);
CommonResult addResult = super.add(acUserNew);
if (addResult.isSuccess()) {
acUserNew.setPasswordMd5(null);
result.addDefaultModel(acUserNew);
result.succeed();
} else {
result.failed(addResult.getMessage());
}
} catch (Exception e) {
LogHelper.EXCEPTION.error("注册用户异常,loginUserVo={}", loginUserVo, e);
result.failed("注册用户异常,原因是:" + e.getMessage());
}
return result;
}
@Override
public boolean tryEnter(String group) {
if (!groupLimits.containsKey(group)) {
LogUtils.logError("TpsLimiter::tryEnter", "can not find group in groupLimits, Group =" + group);
return false;
}
RateLimiter rl = groupCount.get(group);
if (rl == null) {
LogUtils.logError("TpsLimiter::tryEnter", "can not find group in groupCount, Group =" + group);
return false;
}
if (!rl.tryAcquire()) {
LogUtils.logWarn("TpsLimiter::tryEnter",
String.format("Group %s TPS is over warning threshold(%f)!", group, groupLimits.get(group) * warningRatio));
MetricUtils.incWarnLimitCounter(group);
rl = groupWarnCount.get(group);
if (rl == null) {
LogUtils.logError("TpsLimiter::tryEnter", "can not find group in groupWarnCount, Group =" + group);
return false;
}
if (!rl.tryAcquire()) {
return false;
}
}
if (!totalCount.tryAcquire()) {
LogUtils.logWarn("TpsLimiter::tryEnter",
String.format("Total TPS is over warning threshold(%f)!", totalLimit * warningRatio));
MetricUtils.incWarnLimitCounter(group);
return totalWarnCount.tryAcquire();
}
return true;
}
@Override
public boolean takePermits(String key, Integer permits) {
if (permits == null || permits <= 0) {
return true;
}
final RateLimiter rateLimiter = rateLimiterMap.get(key);
if (rateLimiter == null) {
// 未对其做限速处理
return true;
}
return rateLimiter.tryAcquire(permits);
}
@Override
public boolean tryEnter(String group) {
if (!groupLimits.containsKey(group)) {
LogUtils.logError("TpsLimiter::tryEnter", "can not find group in groupLimits, Group =" + group);
return false;
}
RateLimiter rl = groupCount.get(group);
if (rl == null) {
LogUtils.logError("TpsLimiter::tryEnter", "can not find group in groupCount, Group =" + group);
return false;
}
if (!rl.tryAcquire()) {
LogUtils.logWarn("TpsLimiter::tryEnter",
String.format("Group %s TPS is over warning threshold(%f)!", group, groupLimits.get(group) * warningRatio));
MetricUtils.incWarnLimitCounter(group);
rl = groupWarnCount.get(group);
if (rl == null) {
LogUtils.logError("TpsLimiter::tryEnter", "can not find group in groupWarnCount, Group =" + group);
return false;
}
if (!rl.tryAcquire()) {
return false;
}
}
if (!totalCount.tryAcquire()) {
LogUtils.logWarn("TpsLimiter::tryEnter",
String.format("Total TPS is over warning threshold(%f)!", totalLimit * warningRatio));
MetricUtils.incWarnLimitCounter(group);
return totalWarnCount.tryAcquire();
}
return true;
}
@Test
public void testInfiniteRateLimitPossible() {
RateLimiter rl = RateLimiter.create(Double.POSITIVE_INFINITY);
boolean acquired = rl.tryAcquire(10000, Duration.ofMillis(100));
Assert.assertTrue(acquired);
}
private boolean isRateLimitExceeded(String topic, int partition, int msgCount, int bytes) {
RateLimiter qpsRateLimiter = m_config.getPartitionProduceQPSRateLimiter(topic, partition);
RateLimiter bytesRateLimiter = m_config.getPartitionProduceBytesRateLimiter(topic, partition);
if (qpsRateLimiter.tryAcquire(msgCount)) {
if (bytesRateLimiter.tryAcquire(bytes)) {
return false;
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_BYTES_RATE_LIMIT_EXCEED, topic + "-" + partition);
}
} else {
Cat.logEvent(CatConstants.TYPE_MESSAGE_BROKER_QPS_RATE_LIMIT_EXCEED, topic + "-" + partition);
}
return true;
}
private void doInterceptForTokenBucket(int rate, String resource, String fallback, Invocation inv) {
RateLimiter limiter = LimiterManager.me().getOrCreateRateLimiter(resource, rate);
//允许通行
if (limiter.tryAcquire()) {
inv.invoke();
}
//不允许通行
else {
doExecFallback(resource, fallback, inv);
}
}
private boolean isRequestPermittedWithCount(String ip, String endpoint, int count) {
CacheKey key = new CacheKey(ip, endpoint);
RateLimiter limit = rateLimits.getUnchecked(key);
if (limit.tryAcquire()) {
return true;
}
// didn't get it right away, log, wait for 1 second, then fail request
logger.warn("rate limited; waiting for up to 1 second (ip {}, endpoint {})", ip, endpoint);
return limit.tryAcquire(count, TimeUnit.SECONDS);
}
private State getState(RateLimiter rateLimiter, State successState, State failState) {
if (rateLimiter == null) {
return successState;
}
final boolean acquire = rateLimiter.tryAcquire();
if (acquire) {
return successState;
} else {
return failState;
}
}
@Test
public void givenLimitedResource_whenTryAcquire_shouldNotBlockIndefinitely() {
//given
RateLimiter rateLimiter = RateLimiter.create(1);
//when
rateLimiter.acquire();
boolean result = rateLimiter.tryAcquire(2, 10, TimeUnit.MILLISECONDS);
//then
assertThat(result).isFalse();
}
public boolean tryAcquire(String topic, int permits) {
RateLimiter limiter = httpRateLimiterMap.get(topic);
return limiter == null || limiter.tryAcquire(permits);
}
public boolean tryAcquire(String topic, int permits) {
RateLimiter limiter = httpRateLimiterMap.get(topic);
return limiter == null || limiter.tryAcquire(permits);
}