下面列出了com.google.common.util.concurrent.RateLimiter#getRate ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static IFace create(double maxRequestsPerSecond) {
if (maxRequestsPerSecond <= 0.0) {
return ALLOW_NONE;
}
final RateLimiter rateLimiter = RateLimiter.create(maxRequestsPerSecond);
return new IFace() {
@Override
public boolean allowDequeue(int numJobs) {
return rateLimiter.tryAcquire(numJobs);
}
@Override
public double getRate() {
return rateLimiter.getRate();
}
};
}
/**
* Try to acquire token from rate limiter. Emit the utilization of the qps quota if broker metric isn't null.
* @param tableNameWithType table name with type.
* @param queryQuotaEntity query quota entity for type-specific table.
* @return true if there's no qps quota for that table, or a token is acquired successfully.
*/
private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity queryQuotaEntity) {
// Use hit counter to count the number of hits.
queryQuotaEntity.getHitCounter().hit();
RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
double perBrokerRate = rateLimiter.getRate();
// Emit the qps capacity utilization rate.
int numHits = queryQuotaEntity.getHitCounter().getHitCount();
if (_brokerMetrics != null) {
int percentageOfCapacityUtilization = (int) (numHits * 100 / perBrokerRate);
LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization);
_brokerMetrics.setValueOfTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE,
percentageOfCapacityUtilization);
}
if (!rateLimiter.tryAcquire()) {
LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. Current qps: {}", tableNameWithType,
perBrokerRate, numHits);
return false;
}
// Token is successfully acquired.
return true;
}
public double getRate(String topic) {
RateLimiter limiter = httpRateLimiterMap.get(topic);
if (limiter != null) {
return limiter.getRate();
}
return 0;
}
public double getRate(String topic) {
RateLimiter limiter = httpRateLimiterMap.get(topic);
if (limiter != null) {
return limiter.getRate();
}
return 0;
}
public RateLimiter getPartitionProduceQPSRateLimiter(String topic,
int partition) {
Double limit = getLimit(m_topicQPSRateLimits.get(), topic);
Pair<String, Integer> tp = new Pair<String, Integer>(topic, partition);
RateLimiter rateLimiter = m_topicPartitionQPSRateLimiters.get(tp);
if (rateLimiter == null) {
synchronized (m_topicPartitionQPSRateLimiters) {
rateLimiter = m_topicPartitionQPSRateLimiters.get(tp);
if (rateLimiter == null) {
rateLimiter = RateLimiter.create(limit);
m_topicPartitionQPSRateLimiters.put(tp, rateLimiter);
log.info(
"Set single partition's qps rate limit to {} for topic {} and partition {}",
limit, topic, partition);
}
}
} else {
synchronized (rateLimiter) {
if (rateLimiter.getRate() != limit) {
rateLimiter.setRate(limit);
log.info(
"Single partition's qps rate limit changed to {} for topic {} and partition {}",
limit, topic, partition);
}
}
}
return rateLimiter;
}
public RateLimiter getPartitionProduceBytesRateLimiter(String topic,
int partition) {
Double limit = getLimit(m_topicBytesRateLimits.get(), topic);
Pair<String, Integer> tp = new Pair<String, Integer>(topic, partition);
RateLimiter rateLimiter = m_topicPartitionBytesRateLimiters.get(tp);
if (rateLimiter == null) {
synchronized (m_topicPartitionBytesRateLimiters) {
rateLimiter = m_topicPartitionBytesRateLimiters.get(tp);
if (rateLimiter == null) {
rateLimiter = RateLimiter.create(limit);
m_topicPartitionBytesRateLimiters.put(tp, rateLimiter);
log.info(
"Set single partition's bytes rate limit to {} for topic {}",
limit, topic);
}
}
} else {
synchronized (rateLimiter) {
if (rateLimiter.getRate() != limit) {
rateLimiter.setRate(limit);
log.info(
"Single partition's bytes rate limit changed to {} for topic {}",
limit, topic);
}
}
}
return rateLimiter;
}
public RateLimiter getOrCreateRateLimiter(String resource, int rate) {
RateLimiter limiter = rateLimiterCache.get(resource);
if (limiter == null || limiter.getRate() != rate) {
synchronized (resource.intern()) {
limiter = rateLimiterCache.get(resource);
if (limiter == null) {
limiter = RateLimiter.create(rate);
rateLimiterCache.put(resource, limiter);
}
}
}
return limiter;
}
private static void updateRuntimeConfig(Supplier<StyxConfig> config, RateLimiter rateLimiter) {
try {
double currentRate = rateLimiter.getRate();
Double updatedRate = config.get().submissionRateLimit().orElse(
StyxScheduler.DEFAULT_SUBMISSION_RATE_PER_SEC);
if (Math.abs(updatedRate - currentRate) >= 0.1) {
LOG.info("Updating submission rate limit: {} -> {}", currentRate, updatedRate);
rateLimiter.setRate(updatedRate);
}
} catch (Exception e) {
LOG.warn("Failed to fetch the submission rate config from storage, "
+ "skipping RateLimiter update", e);
}
}
private void mayUpdateThroughput(double limit, RateLimiter rateLimiter)
{
// if throughput is set to 0, throttling is disabled
if (limit == 0)
limit = Double.MAX_VALUE;
if (rateLimiter.getRate() != limit)
rateLimiter.setRate(limit);
}